fix: fix pool ready event emission
[poolifier.git] / tests / pools / thread / fixed.test.mjs
CommitLineData
a074ffee
JB
1import { expect } from 'expect'
2import { FixedThreadPool, PoolEvents } from '../../../lib/index.js'
3import { TaskFunctions } from '../../test-types.js'
4import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.js'
5import { DEFAULT_TASK_NAME } from '../../../lib/utils.js'
506c2a14 6
079de991 7describe('Fixed thread pool test suite', () => {
e1ffb94f 8 const numberOfThreads = 6
4e377863 9 const tasksConcurrency = 2
e1ffb94f
JB
10 const pool = new FixedThreadPool(
11 numberOfThreads,
b2fd3f4a 12 './tests/worker-files/thread/testWorker.mjs',
e1ffb94f 13 {
041dc05b 14 errorHandler: e => console.error(e)
e1ffb94f
JB
15 }
16 )
594bfb84
JB
17 const queuePool = new FixedThreadPool(
18 numberOfThreads,
b2fd3f4a 19 './tests/worker-files/thread/testWorker.mjs',
594bfb84
JB
20 {
21 enableTasksQueue: true,
22 tasksQueueOptions: {
4e377863 23 concurrency: tasksConcurrency
594bfb84 24 },
041dc05b 25 errorHandler: e => console.error(e)
594bfb84
JB
26 }
27 )
e1ffb94f
JB
28 const emptyPool = new FixedThreadPool(
29 numberOfThreads,
b2fd3f4a 30 './tests/worker-files/thread/emptyWorker.mjs',
73bfd59d 31 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
32 )
33 const echoPool = new FixedThreadPool(
34 numberOfThreads,
b2fd3f4a 35 './tests/worker-files/thread/echoWorker.mjs'
e1ffb94f
JB
36 )
37 const errorPool = new FixedThreadPool(
38 numberOfThreads,
b2fd3f4a 39 './tests/worker-files/thread/errorWorker.mjs',
e1ffb94f 40 {
041dc05b 41 errorHandler: e => console.error(e)
e1ffb94f
JB
42 }
43 )
44 const asyncErrorPool = new FixedThreadPool(
45 numberOfThreads,
b2fd3f4a 46 './tests/worker-files/thread/asyncErrorWorker.mjs',
e1ffb94f 47 {
041dc05b 48 errorHandler: e => console.error(e)
e1ffb94f
JB
49 }
50 )
51 const asyncPool = new FixedThreadPool(
52 numberOfThreads,
b2fd3f4a 53 './tests/worker-files/thread/asyncWorker.mjs'
e1ffb94f
JB
54 )
55
0e2503fc
JB
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool.destroy()
59 await asyncPool.destroy()
60 await errorPool.destroy()
7c0ba920 61 await asyncErrorPool.destroy()
0e2503fc 62 await emptyPool.destroy()
594bfb84 63 await queuePool.destroy()
0e2503fc
JB
64 })
65
506c2a14 66 it('Verify that the function is executed in a worker thread', async () => {
6db75ad9 67 let result = await pool.execute({
dbca3be9 68 function: TaskFunctions.fibonacci
6db75ad9 69 })
024daf59 70 expect(result).toBe(75025)
6db75ad9 71 result = await pool.execute({
dbca3be9 72 function: TaskFunctions.factorial
6db75ad9 73 })
70a4f5ea 74 expect(result).toBe(9.33262154439441e157)
506c2a14 75 })
76
318d4156 77 it('Verify that is possible to invoke the execute() method without input', async () => {
106744f7 78 const result = await pool.execute()
30b963d4 79 expect(result).toStrictEqual({ ok: 1 })
106744f7 80 })
81
1dcbfefc 82 it("Verify that 'ready' event is emitted", async () => {
0fe39c97 83 const pool = new FixedThreadPool(
079de991 84 numberOfThreads,
b2fd3f4a 85 './tests/worker-files/thread/testWorker.mjs',
079de991 86 {
041dc05b 87 errorHandler: e => console.error(e)
079de991
JB
88 }
89 )
c726f66c 90 expect(pool.emitter.eventNames()).toStrictEqual([])
079de991 91 let poolReady = 0
0fe39c97
JB
92 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
93 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 94 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
216541b6 95 expect(poolReady).toBe(1)
0fe39c97 96 await pool.destroy()
216541b6
JB
97 })
98
94407def
JB
99 it("Verify that 'busy' event is emitted", async () => {
100 const promises = new Set()
c726f66c 101 expect(pool.emitter.eventNames()).toStrictEqual([])
7c0ba920 102 let poolBusy = 0
aee46736 103 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
c726f66c 104 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
7c0ba920 105 for (let i = 0; i < numberOfThreads * 2; i++) {
94407def 106 promises.add(pool.execute())
7c0ba920 107 }
94407def 108 await Promise.all(promises)
14916bf9
JB
109 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
110 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
111 expect(poolBusy).toBe(numberOfThreads + 1)
7c0ba920
JB
112 })
113
594bfb84 114 it('Verify that tasks queuing is working', async () => {
d3d4b67d 115 const promises = new Set()
4e377863 116 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 117 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
d3d4b67d 118 promises.add(queuePool.execute())
594bfb84 119 }
d3d4b67d 120 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
594bfb84 121 for (const workerNode of queuePool.workerNodes) {
5bb5be17 122 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 123 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
124 queuePool.opts.tasksQueueOptions.concurrency
125 )
f59e1027 126 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
127 expect(workerNode.usage.tasks.queued).toBe(
128 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
129 )
130 expect(workerNode.usage.tasks.maxQueued).toBe(
131 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
132 )
1f0766e7 133 expect(workerNode.usage.tasks.stolen).toBe(0)
594bfb84 134 }
1f0766e7 135 expect(queuePool.info.executedTasks).toBe(0)
4e377863
JB
136 expect(queuePool.info.executingTasks).toBe(
137 numberOfThreads * queuePool.opts.tasksQueueOptions.concurrency
138 )
6b27d407 139 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
140 numberOfThreads *
141 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
142 )
143 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
144 numberOfThreads *
145 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 146 )
a1763c54 147 expect(queuePool.info.backPressure).toBe(false)
1f0766e7 148 expect(queuePool.info.stolenTasks).toBe(0)
594bfb84
JB
149 await Promise.all(promises)
150 for (const workerNode of queuePool.workerNodes) {
a6b3272b
JB
151 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
152 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
153 numberOfThreads * maxMultiplier
154 )
4e377863 155 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 156 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
157 expect(workerNode.usage.tasks.maxQueued).toBe(
158 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
159 )
1f0766e7
JB
160 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
161 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
162 numberOfThreads * maxMultiplier
163 )
594bfb84 164 }
1f0766e7
JB
165 expect(queuePool.info.executedTasks).toBe(numberOfThreads * maxMultiplier)
166 expect(queuePool.info.backPressure).toBe(false)
167 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
168 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
169 numberOfThreads * maxMultiplier
170 )
594bfb84
JB
171 })
172
106744f7 173 it('Verify that is possible to have a worker that return undefined', async () => {
174 const result = await emptyPool.execute()
6db75ad9 175 expect(result).toBeUndefined()
106744f7 176 })
177
178 it('Verify that data are sent to the worker correctly', async () => {
179 const data = { f: 10 }
180 const result = await echoPool.execute(data)
e1ffb94f 181 expect(result).toStrictEqual(data)
106744f7 182 })
183
9ea61037 184 it('Verify that transferable objects are sent to the worker correctly', async () => {
9ea61037
JB
185 let error
186 let result
187 try {
f05314ff
JB
188 result = await pool.execute(undefined, undefined, [
189 new ArrayBuffer(16),
190 new MessageChannel().port1
191 ])
9ea61037
JB
192 } catch (e) {
193 error = e
194 }
195 expect(result).toStrictEqual({ ok: 1 })
196 expect(error).toBeUndefined()
f05314ff
JB
197 try {
198 result = await pool.execute(undefined, undefined, [
199 new SharedArrayBuffer(16)
200 ])
201 } catch (e) {
202 error = e
203 }
204 expect(result).toStrictEqual({ ok: 1 })
4a8faf08
JB
205 expect(error).toBeInstanceOf(Error)
206 expect(error.message).toMatch(
207 /Found invalid (object|value) in transferList/
f05314ff 208 )
9ea61037
JB
209 })
210
7c0ba920 211 it('Verify that error handling is working properly:sync', async () => {
106744f7 212 const data = { f: 10 }
c726f66c 213 expect(errorPool.emitter.eventNames()).toStrictEqual([])
d46660cd 214 let taskError
041dc05b 215 errorPool.emitter.on(PoolEvents.taskError, e => {
d46660cd
JB
216 taskError = e
217 })
c726f66c 218 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
106744f7 219 let inError
220 try {
221 await errorPool.execute(data)
222 } catch (e) {
223 inError = e
224 }
7c0ba920
JB
225 expect(inError).toBeDefined()
226 expect(inError).toBeInstanceOf(Error)
227 expect(inError.message).toBeDefined()
8620fb25 228 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
229 expect(inError.message).toBe('Error Message from ThreadWorker')
230 expect(taskError).toStrictEqual({
6cd5248f 231 name: DEFAULT_TASK_NAME,
985d0e79
JB
232 message: new Error('Error Message from ThreadWorker'),
233 data
234 })
18482cec
JB
235 expect(
236 errorPool.workerNodes.some(
041dc05b 237 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
238 )
239 ).toBe(true)
7c0ba920
JB
240 })
241
242 it('Verify that error handling is working properly:async', async () => {
243 const data = { f: 10 }
c726f66c 244 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
4f0b85b3 245 let taskError
041dc05b 246 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
4f0b85b3
JB
247 taskError = e
248 })
c726f66c
JB
249 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
250 PoolEvents.taskError
251 ])
7c0ba920
JB
252 let inError
253 try {
254 await asyncErrorPool.execute(data)
255 } catch (e) {
256 inError = e
257 }
258 expect(inError).toBeDefined()
259 expect(inError).toBeInstanceOf(Error)
260 expect(inError.message).toBeDefined()
8620fb25 261 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
262 expect(inError.message).toBe('Error Message from ThreadWorker:async')
263 expect(taskError).toStrictEqual({
6cd5248f 264 name: DEFAULT_TASK_NAME,
985d0e79
JB
265 message: new Error('Error Message from ThreadWorker:async'),
266 data
267 })
18482cec
JB
268 expect(
269 asyncErrorPool.workerNodes.some(
041dc05b 270 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
271 )
272 ).toBe(true)
106744f7 273 })
274
7784f548 275 it('Verify that async function is working properly', async () => {
276 const data = { f: 10 }
15e5141f 277 const startTime = performance.now()
7784f548 278 const result = await asyncPool.execute(data)
15e5141f 279 const usedTime = performance.now() - startTime
e1ffb94f 280 expect(result).toStrictEqual(data)
32d490eb 281 expect(usedTime).toBeGreaterThanOrEqual(2000)
7784f548 282 })
283
506c2a14 284 it('Shutdown test', async () => {
bac873bd 285 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
b04bd0f1 286 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
ef3891a3
JB
287 let poolDestroy = 0
288 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
b04bd0f1
JB
289 expect(pool.emitter.eventNames()).toStrictEqual([
290 PoolEvents.busy,
291 PoolEvents.destroy
292 ])
1f9a5a44 293 await pool.destroy()
bdacc2d2 294 const numberOfExitEvents = await exitPromise
bb9423b7 295 expect(pool.started).toBe(false)
55082af9 296 expect(pool.readyEventEmitted).toBe(false)
bb9423b7 297 expect(pool.workerNodes.length).toBe(0)
bdacc2d2 298 expect(numberOfExitEvents).toBe(numberOfThreads)
ef3891a3 299 expect(poolDestroy).toBe(1)
506c2a14 300 })
301
90082c8c 302 it('Verify that thread pool options are checked', async () => {
b2fd3f4a 303 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
0fe39c97
JB
304 let pool = new FixedThreadPool(numberOfThreads, workerFilePath)
305 expect(pool.opts.workerOptions).toBeUndefined()
306 await pool.destroy()
307 pool = new FixedThreadPool(numberOfThreads, workerFilePath, {
90082c8c
JB
308 workerOptions: {
309 env: { TEST: 'test' },
310 name: 'test'
311 }
312 })
0fe39c97 313 expect(pool.opts.workerOptions).toStrictEqual({
90082c8c
JB
314 env: { TEST: 'test' },
315 name: 'test'
316 })
0fe39c97 317 await pool.destroy()
90082c8c
JB
318 })
319
506c2a14 320 it('Should work even without opts in input', async () => {
b2fd3f4a 321 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
7d27ec0d 322 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
0fe39c97 323 const res = await pool.execute()
30b963d4 324 expect(res).toStrictEqual({ ok: 1 })
0e2503fc 325 // We need to clean up the resources after our test
0fe39c97 326 await pool.destroy()
506c2a14 327 })
8d3782fa 328
7d27ec0d 329 it('Verify destroyWorkerNode()', async () => {
b2fd3f4a 330 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
7d27ec0d 331 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
52a23942 332 const workerNodeKey = 0
7d27ec0d 333 let exitEvent = 0
52a23942 334 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
7d27ec0d
JB
335 ++exitEvent
336 })
52a23942 337 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
7d27ec0d
JB
338 expect(exitEvent).toBe(1)
339 expect(pool.workerNodes.length).toBe(numberOfThreads - 1)
340 await pool.destroy()
341 })
342
1cc6e9ef 343 it('Verify that a pool with zero worker fails', () => {
8d3782fa 344 expect(
b2fd3f4a 345 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.mjs')
948faff7 346 ).toThrow('Cannot instantiate a fixed pool with zero worker')
8d3782fa 347 })
506c2a14 348})