Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / tests / pools / thread / fixed.test.mjs
CommitLineData
a074ffee
JB
1import { expect } from 'expect'
2import { FixedThreadPool, PoolEvents } from '../../../lib/index.js'
3import { TaskFunctions } from '../../test-types.js'
4import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.js'
5import { DEFAULT_TASK_NAME } from '../../../lib/utils.js'
506c2a14 6
079de991 7describe('Fixed thread pool test suite', () => {
e1ffb94f 8 const numberOfThreads = 6
4e377863 9 const tasksConcurrency = 2
e1ffb94f
JB
10 const pool = new FixedThreadPool(
11 numberOfThreads,
b2fd3f4a 12 './tests/worker-files/thread/testWorker.mjs',
e1ffb94f 13 {
041dc05b 14 errorHandler: e => console.error(e)
e1ffb94f
JB
15 }
16 )
594bfb84
JB
17 const queuePool = new FixedThreadPool(
18 numberOfThreads,
b2fd3f4a 19 './tests/worker-files/thread/testWorker.mjs',
594bfb84
JB
20 {
21 enableTasksQueue: true,
22 tasksQueueOptions: {
4e377863 23 concurrency: tasksConcurrency
594bfb84 24 },
041dc05b 25 errorHandler: e => console.error(e)
594bfb84
JB
26 }
27 )
e1ffb94f
JB
28 const emptyPool = new FixedThreadPool(
29 numberOfThreads,
b2fd3f4a 30 './tests/worker-files/thread/emptyWorker.mjs',
73bfd59d 31 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
32 )
33 const echoPool = new FixedThreadPool(
34 numberOfThreads,
b2fd3f4a 35 './tests/worker-files/thread/echoWorker.mjs'
e1ffb94f
JB
36 )
37 const errorPool = new FixedThreadPool(
38 numberOfThreads,
b2fd3f4a 39 './tests/worker-files/thread/errorWorker.mjs',
e1ffb94f 40 {
041dc05b 41 errorHandler: e => console.error(e)
e1ffb94f
JB
42 }
43 )
44 const asyncErrorPool = new FixedThreadPool(
45 numberOfThreads,
b2fd3f4a 46 './tests/worker-files/thread/asyncErrorWorker.mjs',
e1ffb94f 47 {
041dc05b 48 errorHandler: e => console.error(e)
e1ffb94f
JB
49 }
50 )
51 const asyncPool = new FixedThreadPool(
52 numberOfThreads,
b2fd3f4a 53 './tests/worker-files/thread/asyncWorker.mjs'
e1ffb94f
JB
54 )
55
0e2503fc
JB
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool.destroy()
59 await asyncPool.destroy()
60 await errorPool.destroy()
7c0ba920 61 await asyncErrorPool.destroy()
0e2503fc 62 await emptyPool.destroy()
594bfb84 63 await queuePool.destroy()
0e2503fc
JB
64 })
65
506c2a14 66 it('Verify that the function is executed in a worker thread', async () => {
6db75ad9 67 let result = await pool.execute({
dbca3be9 68 function: TaskFunctions.fibonacci
6db75ad9 69 })
024daf59 70 expect(result).toBe(75025)
6db75ad9 71 result = await pool.execute({
dbca3be9 72 function: TaskFunctions.factorial
6db75ad9 73 })
70a4f5ea 74 expect(result).toBe(9.33262154439441e157)
506c2a14 75 })
76
318d4156 77 it('Verify that is possible to invoke the execute() method without input', async () => {
106744f7 78 const result = await pool.execute()
30b963d4 79 expect(result).toStrictEqual({ ok: 1 })
106744f7 80 })
81
1dcbfefc 82 it("Verify that 'ready' event is emitted", async () => {
0fe39c97 83 const pool = new FixedThreadPool(
079de991 84 numberOfThreads,
b2fd3f4a 85 './tests/worker-files/thread/testWorker.mjs',
079de991 86 {
041dc05b 87 errorHandler: e => console.error(e)
079de991
JB
88 }
89 )
c726f66c 90 expect(pool.emitter.eventNames()).toStrictEqual([])
079de991 91 let poolReady = 0
0fe39c97
JB
92 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
93 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 94 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
216541b6 95 expect(poolReady).toBe(1)
0fe39c97 96 await pool.destroy()
216541b6
JB
97 })
98
94407def
JB
99 it("Verify that 'busy' event is emitted", async () => {
100 const promises = new Set()
c726f66c 101 expect(pool.emitter.eventNames()).toStrictEqual([])
7c0ba920 102 let poolBusy = 0
aee46736 103 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
c726f66c 104 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
7c0ba920 105 for (let i = 0; i < numberOfThreads * 2; i++) {
94407def 106 promises.add(pool.execute())
7c0ba920 107 }
94407def 108 await Promise.all(promises)
14916bf9
JB
109 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
110 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
111 expect(poolBusy).toBe(numberOfThreads + 1)
7c0ba920
JB
112 })
113
594bfb84 114 it('Verify that tasks queuing is working', async () => {
d3d4b67d 115 const promises = new Set()
4e377863 116 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 117 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
d3d4b67d 118 promises.add(queuePool.execute())
594bfb84 119 }
d3d4b67d 120 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
594bfb84 121 for (const workerNode of queuePool.workerNodes) {
5bb5be17 122 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 123 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
124 queuePool.opts.tasksQueueOptions.concurrency
125 )
f59e1027 126 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
127 expect(workerNode.usage.tasks.queued).toBe(
128 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
129 )
130 expect(workerNode.usage.tasks.maxQueued).toBe(
131 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
132 )
463226a4 133 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
1f0766e7 134 expect(workerNode.usage.tasks.stolen).toBe(0)
594bfb84 135 }
1f0766e7 136 expect(queuePool.info.executedTasks).toBe(0)
4e377863
JB
137 expect(queuePool.info.executingTasks).toBe(
138 numberOfThreads * queuePool.opts.tasksQueueOptions.concurrency
139 )
6b27d407 140 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
141 numberOfThreads *
142 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
143 )
144 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
145 numberOfThreads *
146 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 147 )
a1763c54 148 expect(queuePool.info.backPressure).toBe(false)
1f0766e7 149 expect(queuePool.info.stolenTasks).toBe(0)
594bfb84
JB
150 await Promise.all(promises)
151 for (const workerNode of queuePool.workerNodes) {
a6b3272b
JB
152 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
153 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
154 numberOfThreads * maxMultiplier
155 )
4e377863 156 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 157 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
158 expect(workerNode.usage.tasks.maxQueued).toBe(
159 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
160 )
463226a4
JB
161 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
162 0
163 )
164 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
165 numberOfThreads * maxMultiplier
166 )
1f0766e7
JB
167 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
168 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
169 numberOfThreads * maxMultiplier
170 )
594bfb84 171 }
1f0766e7
JB
172 expect(queuePool.info.executedTasks).toBe(numberOfThreads * maxMultiplier)
173 expect(queuePool.info.backPressure).toBe(false)
174 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
175 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
176 numberOfThreads * maxMultiplier
177 )
594bfb84
JB
178 })
179
106744f7 180 it('Verify that is possible to have a worker that return undefined', async () => {
181 const result = await emptyPool.execute()
6db75ad9 182 expect(result).toBeUndefined()
106744f7 183 })
184
185 it('Verify that data are sent to the worker correctly', async () => {
186 const data = { f: 10 }
187 const result = await echoPool.execute(data)
e1ffb94f 188 expect(result).toStrictEqual(data)
106744f7 189 })
190
9ea61037 191 it('Verify that transferable objects are sent to the worker correctly', async () => {
9ea61037
JB
192 let error
193 let result
194 try {
f05314ff
JB
195 result = await pool.execute(undefined, undefined, [
196 new ArrayBuffer(16),
197 new MessageChannel().port1
198 ])
9ea61037
JB
199 } catch (e) {
200 error = e
201 }
202 expect(result).toStrictEqual({ ok: 1 })
203 expect(error).toBeUndefined()
f05314ff
JB
204 try {
205 result = await pool.execute(undefined, undefined, [
206 new SharedArrayBuffer(16)
207 ])
208 } catch (e) {
209 error = e
210 }
211 expect(result).toStrictEqual({ ok: 1 })
4a8faf08
JB
212 expect(error).toBeInstanceOf(Error)
213 expect(error.message).toMatch(
214 /Found invalid (object|value) in transferList/
f05314ff 215 )
9ea61037
JB
216 })
217
7c0ba920 218 it('Verify that error handling is working properly:sync', async () => {
106744f7 219 const data = { f: 10 }
c726f66c 220 expect(errorPool.emitter.eventNames()).toStrictEqual([])
d46660cd 221 let taskError
041dc05b 222 errorPool.emitter.on(PoolEvents.taskError, e => {
d46660cd
JB
223 taskError = e
224 })
c726f66c 225 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
106744f7 226 let inError
227 try {
228 await errorPool.execute(data)
229 } catch (e) {
230 inError = e
231 }
7c0ba920
JB
232 expect(inError).toBeDefined()
233 expect(inError).toBeInstanceOf(Error)
234 expect(inError.message).toBeDefined()
8620fb25 235 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
236 expect(inError.message).toBe('Error Message from ThreadWorker')
237 expect(taskError).toStrictEqual({
6cd5248f 238 name: DEFAULT_TASK_NAME,
985d0e79
JB
239 message: new Error('Error Message from ThreadWorker'),
240 data
241 })
18482cec
JB
242 expect(
243 errorPool.workerNodes.some(
041dc05b 244 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
245 )
246 ).toBe(true)
7c0ba920
JB
247 })
248
249 it('Verify that error handling is working properly:async', async () => {
250 const data = { f: 10 }
c726f66c 251 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
4f0b85b3 252 let taskError
041dc05b 253 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
4f0b85b3
JB
254 taskError = e
255 })
c726f66c
JB
256 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
257 PoolEvents.taskError
258 ])
7c0ba920
JB
259 let inError
260 try {
261 await asyncErrorPool.execute(data)
262 } catch (e) {
263 inError = e
264 }
265 expect(inError).toBeDefined()
266 expect(inError).toBeInstanceOf(Error)
267 expect(inError.message).toBeDefined()
8620fb25 268 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
269 expect(inError.message).toBe('Error Message from ThreadWorker:async')
270 expect(taskError).toStrictEqual({
6cd5248f 271 name: DEFAULT_TASK_NAME,
985d0e79
JB
272 message: new Error('Error Message from ThreadWorker:async'),
273 data
274 })
18482cec
JB
275 expect(
276 asyncErrorPool.workerNodes.some(
041dc05b 277 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
278 )
279 ).toBe(true)
106744f7 280 })
281
7784f548 282 it('Verify that async function is working properly', async () => {
283 const data = { f: 10 }
15e5141f 284 const startTime = performance.now()
7784f548 285 const result = await asyncPool.execute(data)
15e5141f 286 const usedTime = performance.now() - startTime
e1ffb94f 287 expect(result).toStrictEqual(data)
32d490eb 288 expect(usedTime).toBeGreaterThanOrEqual(2000)
7784f548 289 })
290
506c2a14 291 it('Shutdown test', async () => {
bac873bd 292 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
b04bd0f1 293 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
ef3891a3
JB
294 let poolDestroy = 0
295 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
b04bd0f1
JB
296 expect(pool.emitter.eventNames()).toStrictEqual([
297 PoolEvents.busy,
298 PoolEvents.destroy
299 ])
1f9a5a44 300 await pool.destroy()
bdacc2d2 301 const numberOfExitEvents = await exitPromise
bb9423b7 302 expect(pool.started).toBe(false)
55082af9 303 expect(pool.readyEventEmitted).toBe(false)
76efd044 304 expect(pool.emitter.eventNames()).toStrictEqual([])
bb9423b7 305 expect(pool.workerNodes.length).toBe(0)
bdacc2d2 306 expect(numberOfExitEvents).toBe(numberOfThreads)
ef3891a3 307 expect(poolDestroy).toBe(1)
506c2a14 308 })
309
90082c8c 310 it('Verify that thread pool options are checked', async () => {
b2fd3f4a 311 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
0fe39c97
JB
312 let pool = new FixedThreadPool(numberOfThreads, workerFilePath)
313 expect(pool.opts.workerOptions).toBeUndefined()
314 await pool.destroy()
315 pool = new FixedThreadPool(numberOfThreads, workerFilePath, {
90082c8c
JB
316 workerOptions: {
317 env: { TEST: 'test' },
318 name: 'test'
319 }
320 })
0fe39c97 321 expect(pool.opts.workerOptions).toStrictEqual({
90082c8c
JB
322 env: { TEST: 'test' },
323 name: 'test'
324 })
0fe39c97 325 await pool.destroy()
90082c8c
JB
326 })
327
506c2a14 328 it('Should work even without opts in input', async () => {
b2fd3f4a 329 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
7d27ec0d 330 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
0fe39c97 331 const res = await pool.execute()
30b963d4 332 expect(res).toStrictEqual({ ok: 1 })
0e2503fc 333 // We need to clean up the resources after our test
0fe39c97 334 await pool.destroy()
506c2a14 335 })
8d3782fa 336
7d27ec0d 337 it('Verify destroyWorkerNode()', async () => {
b2fd3f4a 338 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
7d27ec0d 339 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
52a23942 340 const workerNodeKey = 0
7d27ec0d 341 let exitEvent = 0
52a23942 342 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
7d27ec0d
JB
343 ++exitEvent
344 })
52a23942 345 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
7d27ec0d
JB
346 expect(exitEvent).toBe(1)
347 expect(pool.workerNodes.length).toBe(numberOfThreads - 1)
348 await pool.destroy()
349 })
350
1cc6e9ef 351 it('Verify that a pool with zero worker fails', () => {
8d3782fa 352 expect(
b2fd3f4a 353 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.mjs')
948faff7 354 ).toThrow('Cannot instantiate a fixed pool with zero worker')
8d3782fa 355 })
506c2a14 356})