refactor: use PoolEvents enum
[poolifier.git] / tests / pools / thread / fixed.test.mjs
CommitLineData
a074ffee 1import { expect } from 'expect'
ded253e2 2
d35e5717 3import { FixedThreadPool, PoolEvents } from '../../../lib/index.cjs'
ded253e2 4import { DEFAULT_TASK_NAME } from '../../../lib/utils.cjs'
d35e5717
JB
5import { TaskFunctions } from '../../test-types.cjs'
6import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
506c2a14 7
079de991 8describe('Fixed thread pool test suite', () => {
e1ffb94f 9 const numberOfThreads = 6
4e377863 10 const tasksConcurrency = 2
e1ffb94f
JB
11 const pool = new FixedThreadPool(
12 numberOfThreads,
b2fd3f4a 13 './tests/worker-files/thread/testWorker.mjs',
e1ffb94f 14 {
041dc05b 15 errorHandler: e => console.error(e)
e1ffb94f
JB
16 }
17 )
594bfb84
JB
18 const queuePool = new FixedThreadPool(
19 numberOfThreads,
b2fd3f4a 20 './tests/worker-files/thread/testWorker.mjs',
594bfb84
JB
21 {
22 enableTasksQueue: true,
23 tasksQueueOptions: {
4e377863 24 concurrency: tasksConcurrency
594bfb84 25 },
041dc05b 26 errorHandler: e => console.error(e)
594bfb84
JB
27 }
28 )
e1ffb94f
JB
29 const emptyPool = new FixedThreadPool(
30 numberOfThreads,
b2fd3f4a 31 './tests/worker-files/thread/emptyWorker.mjs',
73bfd59d 32 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
33 )
34 const echoPool = new FixedThreadPool(
35 numberOfThreads,
b2fd3f4a 36 './tests/worker-files/thread/echoWorker.mjs'
e1ffb94f
JB
37 )
38 const errorPool = new FixedThreadPool(
39 numberOfThreads,
b2fd3f4a 40 './tests/worker-files/thread/errorWorker.mjs',
e1ffb94f 41 {
041dc05b 42 errorHandler: e => console.error(e)
e1ffb94f
JB
43 }
44 )
45 const asyncErrorPool = new FixedThreadPool(
46 numberOfThreads,
b2fd3f4a 47 './tests/worker-files/thread/asyncErrorWorker.mjs',
e1ffb94f 48 {
041dc05b 49 errorHandler: e => console.error(e)
e1ffb94f
JB
50 }
51 )
52 const asyncPool = new FixedThreadPool(
53 numberOfThreads,
b2fd3f4a 54 './tests/worker-files/thread/asyncWorker.mjs'
e1ffb94f
JB
55 )
56
0e2503fc
JB
57 after('Destroy all pools', async () => {
58 // We need to clean up the resources after our test
59 await echoPool.destroy()
60 await asyncPool.destroy()
61 await errorPool.destroy()
7c0ba920 62 await asyncErrorPool.destroy()
0e2503fc 63 await emptyPool.destroy()
594bfb84 64 await queuePool.destroy()
0e2503fc
JB
65 })
66
506c2a14 67 it('Verify that the function is executed in a worker thread', async () => {
6db75ad9 68 let result = await pool.execute({
dbca3be9 69 function: TaskFunctions.fibonacci
6db75ad9 70 })
024daf59 71 expect(result).toBe(75025)
6db75ad9 72 result = await pool.execute({
dbca3be9 73 function: TaskFunctions.factorial
6db75ad9 74 })
70a4f5ea 75 expect(result).toBe(9.33262154439441e157)
506c2a14 76 })
77
318d4156 78 it('Verify that is possible to invoke the execute() method without input', async () => {
106744f7 79 const result = await pool.execute()
30b963d4 80 expect(result).toStrictEqual({ ok: 1 })
106744f7 81 })
82
1dcbfefc 83 it("Verify that 'ready' event is emitted", async () => {
0fe39c97 84 const pool = new FixedThreadPool(
079de991 85 numberOfThreads,
b2fd3f4a 86 './tests/worker-files/thread/testWorker.mjs',
079de991 87 {
041dc05b 88 errorHandler: e => console.error(e)
079de991
JB
89 }
90 )
c726f66c 91 expect(pool.emitter.eventNames()).toStrictEqual([])
079de991 92 let poolReady = 0
0fe39c97
JB
93 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
94 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 95 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
216541b6 96 expect(poolReady).toBe(1)
0fe39c97 97 await pool.destroy()
216541b6
JB
98 })
99
94407def
JB
100 it("Verify that 'busy' event is emitted", async () => {
101 const promises = new Set()
c726f66c 102 expect(pool.emitter.eventNames()).toStrictEqual([])
7c0ba920 103 let poolBusy = 0
aee46736 104 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
c726f66c 105 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
7c0ba920 106 for (let i = 0; i < numberOfThreads * 2; i++) {
94407def 107 promises.add(pool.execute())
7c0ba920 108 }
94407def 109 await Promise.all(promises)
14916bf9
JB
110 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
111 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
112 expect(poolBusy).toBe(numberOfThreads + 1)
7c0ba920
JB
113 })
114
594bfb84 115 it('Verify that tasks queuing is working', async () => {
d3d4b67d 116 const promises = new Set()
4e377863 117 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 118 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
d3d4b67d 119 promises.add(queuePool.execute())
594bfb84 120 }
d3d4b67d 121 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
594bfb84 122 for (const workerNode of queuePool.workerNodes) {
5bb5be17 123 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 124 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
125 queuePool.opts.tasksQueueOptions.concurrency
126 )
f59e1027 127 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
128 expect(workerNode.usage.tasks.queued).toBe(
129 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
130 )
131 expect(workerNode.usage.tasks.maxQueued).toBe(
132 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
133 )
463226a4 134 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
1f0766e7 135 expect(workerNode.usage.tasks.stolen).toBe(0)
594bfb84 136 }
1f0766e7 137 expect(queuePool.info.executedTasks).toBe(0)
4e377863
JB
138 expect(queuePool.info.executingTasks).toBe(
139 numberOfThreads * queuePool.opts.tasksQueueOptions.concurrency
140 )
6b27d407 141 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
142 numberOfThreads *
143 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
144 )
145 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
146 numberOfThreads *
147 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 148 )
a1763c54 149 expect(queuePool.info.backPressure).toBe(false)
1f0766e7 150 expect(queuePool.info.stolenTasks).toBe(0)
594bfb84
JB
151 await Promise.all(promises)
152 for (const workerNode of queuePool.workerNodes) {
a6b3272b
JB
153 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
154 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
155 numberOfThreads * maxMultiplier
156 )
4e377863 157 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 158 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
159 expect(workerNode.usage.tasks.maxQueued).toBe(
160 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
161 )
463226a4
JB
162 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
163 0
164 )
165 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
166 numberOfThreads * maxMultiplier
167 )
1f0766e7
JB
168 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
169 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
170 numberOfThreads * maxMultiplier
171 )
594bfb84 172 }
1f0766e7
JB
173 expect(queuePool.info.executedTasks).toBe(numberOfThreads * maxMultiplier)
174 expect(queuePool.info.backPressure).toBe(false)
175 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
176 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
177 numberOfThreads * maxMultiplier
178 )
594bfb84
JB
179 })
180
106744f7 181 it('Verify that is possible to have a worker that return undefined', async () => {
182 const result = await emptyPool.execute()
6db75ad9 183 expect(result).toBeUndefined()
106744f7 184 })
185
186 it('Verify that data are sent to the worker correctly', async () => {
187 const data = { f: 10 }
188 const result = await echoPool.execute(data)
e1ffb94f 189 expect(result).toStrictEqual(data)
106744f7 190 })
191
9ea61037 192 it('Verify that transferable objects are sent to the worker correctly', async () => {
9ea61037
JB
193 let error
194 let result
195 try {
f05314ff
JB
196 result = await pool.execute(undefined, undefined, [
197 new ArrayBuffer(16),
198 new MessageChannel().port1
199 ])
9ea61037
JB
200 } catch (e) {
201 error = e
202 }
203 expect(result).toStrictEqual({ ok: 1 })
204 expect(error).toBeUndefined()
f05314ff
JB
205 try {
206 result = await pool.execute(undefined, undefined, [
207 new SharedArrayBuffer(16)
208 ])
209 } catch (e) {
210 error = e
211 }
212 expect(result).toStrictEqual({ ok: 1 })
4a8faf08
JB
213 expect(error).toBeInstanceOf(Error)
214 expect(error.message).toMatch(
215 /Found invalid (object|value) in transferList/
f05314ff 216 )
9ea61037
JB
217 })
218
7c0ba920 219 it('Verify that error handling is working properly:sync', async () => {
106744f7 220 const data = { f: 10 }
c726f66c 221 expect(errorPool.emitter.eventNames()).toStrictEqual([])
d46660cd 222 let taskError
041dc05b 223 errorPool.emitter.on(PoolEvents.taskError, e => {
d46660cd
JB
224 taskError = e
225 })
c726f66c 226 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
106744f7 227 let inError
228 try {
229 await errorPool.execute(data)
230 } catch (e) {
231 inError = e
232 }
7c0ba920
JB
233 expect(inError).toBeDefined()
234 expect(inError).toBeInstanceOf(Error)
235 expect(inError.message).toBeDefined()
8620fb25 236 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
237 expect(inError.message).toBe('Error Message from ThreadWorker')
238 expect(taskError).toStrictEqual({
6cd5248f 239 name: DEFAULT_TASK_NAME,
985d0e79
JB
240 message: new Error('Error Message from ThreadWorker'),
241 data
242 })
18482cec
JB
243 expect(
244 errorPool.workerNodes.some(
041dc05b 245 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
246 )
247 ).toBe(true)
7c0ba920
JB
248 })
249
250 it('Verify that error handling is working properly:async', async () => {
251 const data = { f: 10 }
c726f66c 252 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
4f0b85b3 253 let taskError
041dc05b 254 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
4f0b85b3
JB
255 taskError = e
256 })
c726f66c
JB
257 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
258 PoolEvents.taskError
259 ])
7c0ba920
JB
260 let inError
261 try {
262 await asyncErrorPool.execute(data)
263 } catch (e) {
264 inError = e
265 }
266 expect(inError).toBeDefined()
267 expect(inError).toBeInstanceOf(Error)
268 expect(inError.message).toBeDefined()
8620fb25 269 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
270 expect(inError.message).toBe('Error Message from ThreadWorker:async')
271 expect(taskError).toStrictEqual({
6cd5248f 272 name: DEFAULT_TASK_NAME,
985d0e79
JB
273 message: new Error('Error Message from ThreadWorker:async'),
274 data
275 })
18482cec
JB
276 expect(
277 asyncErrorPool.workerNodes.some(
041dc05b 278 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
279 )
280 ).toBe(true)
106744f7 281 })
282
7784f548 283 it('Verify that async function is working properly', async () => {
284 const data = { f: 10 }
15e5141f 285 const startTime = performance.now()
7784f548 286 const result = await asyncPool.execute(data)
15e5141f 287 const usedTime = performance.now() - startTime
e1ffb94f 288 expect(result).toStrictEqual(data)
32d490eb 289 expect(usedTime).toBeGreaterThanOrEqual(2000)
7784f548 290 })
291
506c2a14 292 it('Shutdown test', async () => {
bac873bd 293 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
b04bd0f1 294 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
ef3891a3
JB
295 let poolDestroy = 0
296 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
b04bd0f1
JB
297 expect(pool.emitter.eventNames()).toStrictEqual([
298 PoolEvents.busy,
299 PoolEvents.destroy
300 ])
1f9a5a44 301 await pool.destroy()
bdacc2d2 302 const numberOfExitEvents = await exitPromise
bb9423b7 303 expect(pool.started).toBe(false)
a621172f
JB
304 expect(pool.emitter.eventNames()).toStrictEqual([
305 PoolEvents.busy,
306 PoolEvents.destroy
307 ])
8e8d9101 308 expect(pool.readyEventEmitted).toBe(false)
bb9423b7 309 expect(pool.workerNodes.length).toBe(0)
bdacc2d2 310 expect(numberOfExitEvents).toBe(numberOfThreads)
ef3891a3 311 expect(poolDestroy).toBe(1)
506c2a14 312 })
313
90082c8c 314 it('Verify that thread pool options are checked', async () => {
b2fd3f4a 315 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
0fe39c97
JB
316 let pool = new FixedThreadPool(numberOfThreads, workerFilePath)
317 expect(pool.opts.workerOptions).toBeUndefined()
318 await pool.destroy()
319 pool = new FixedThreadPool(numberOfThreads, workerFilePath, {
90082c8c
JB
320 workerOptions: {
321 env: { TEST: 'test' },
322 name: 'test'
323 }
324 })
0fe39c97 325 expect(pool.opts.workerOptions).toStrictEqual({
90082c8c
JB
326 env: { TEST: 'test' },
327 name: 'test'
328 })
0fe39c97 329 await pool.destroy()
90082c8c
JB
330 })
331
506c2a14 332 it('Should work even without opts in input', async () => {
b2fd3f4a 333 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
7d27ec0d 334 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
0fe39c97 335 const res = await pool.execute()
30b963d4 336 expect(res).toStrictEqual({ ok: 1 })
0e2503fc 337 // We need to clean up the resources after our test
0fe39c97 338 await pool.destroy()
506c2a14 339 })
8d3782fa 340
7d27ec0d 341 it('Verify destroyWorkerNode()', async () => {
b2fd3f4a 342 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
7d27ec0d 343 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
52a23942 344 const workerNodeKey = 0
7d27ec0d 345 let exitEvent = 0
52a23942 346 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
7d27ec0d
JB
347 ++exitEvent
348 })
52a23942 349 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
7d27ec0d
JB
350 expect(exitEvent).toBe(1)
351 expect(pool.workerNodes.length).toBe(numberOfThreads - 1)
352 await pool.destroy()
353 })
354
1cc6e9ef 355 it('Verify that a pool with zero worker fails', () => {
8d3782fa 356 expect(
b2fd3f4a 357 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.mjs')
948faff7 358 ).toThrow('Cannot instantiate a fixed pool with zero worker')
8d3782fa 359 })
506c2a14 360})