refactor: convert commonjs files to esm
[poolifier.git] / tests / pools / thread / fixed.test.mjs
CommitLineData
a074ffee
JB
1import { expect } from 'expect'
2import { FixedThreadPool, PoolEvents } from '../../../lib/index.js'
3import { TaskFunctions } from '../../test-types.js'
4import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.js'
5import { DEFAULT_TASK_NAME } from '../../../lib/utils.js'
506c2a14 6
079de991 7describe('Fixed thread pool test suite', () => {
e1ffb94f 8 const numberOfThreads = 6
4e377863 9 const tasksConcurrency = 2
e1ffb94f
JB
10 const pool = new FixedThreadPool(
11 numberOfThreads,
b2fd3f4a 12 './tests/worker-files/thread/testWorker.mjs',
e1ffb94f 13 {
041dc05b 14 errorHandler: e => console.error(e)
e1ffb94f
JB
15 }
16 )
594bfb84
JB
17 const queuePool = new FixedThreadPool(
18 numberOfThreads,
b2fd3f4a 19 './tests/worker-files/thread/testWorker.mjs',
594bfb84
JB
20 {
21 enableTasksQueue: true,
22 tasksQueueOptions: {
4e377863 23 concurrency: tasksConcurrency
594bfb84 24 },
041dc05b 25 errorHandler: e => console.error(e)
594bfb84
JB
26 }
27 )
e1ffb94f
JB
28 const emptyPool = new FixedThreadPool(
29 numberOfThreads,
b2fd3f4a 30 './tests/worker-files/thread/emptyWorker.mjs',
73bfd59d 31 { exitHandler: () => console.info('empty pool worker exited') }
e1ffb94f
JB
32 )
33 const echoPool = new FixedThreadPool(
34 numberOfThreads,
b2fd3f4a 35 './tests/worker-files/thread/echoWorker.mjs'
e1ffb94f
JB
36 )
37 const errorPool = new FixedThreadPool(
38 numberOfThreads,
b2fd3f4a 39 './tests/worker-files/thread/errorWorker.mjs',
e1ffb94f 40 {
041dc05b 41 errorHandler: e => console.error(e)
e1ffb94f
JB
42 }
43 )
44 const asyncErrorPool = new FixedThreadPool(
45 numberOfThreads,
b2fd3f4a 46 './tests/worker-files/thread/asyncErrorWorker.mjs',
e1ffb94f 47 {
041dc05b 48 errorHandler: e => console.error(e)
e1ffb94f
JB
49 }
50 )
51 const asyncPool = new FixedThreadPool(
52 numberOfThreads,
b2fd3f4a 53 './tests/worker-files/thread/asyncWorker.mjs'
e1ffb94f
JB
54 )
55
0e2503fc
JB
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool.destroy()
59 await asyncPool.destroy()
60 await errorPool.destroy()
7c0ba920 61 await asyncErrorPool.destroy()
0e2503fc 62 await emptyPool.destroy()
594bfb84 63 await queuePool.destroy()
0e2503fc
JB
64 })
65
506c2a14 66 it('Verify that the function is executed in a worker thread', async () => {
6db75ad9 67 let result = await pool.execute({
dbca3be9 68 function: TaskFunctions.fibonacci
6db75ad9 69 })
024daf59 70 expect(result).toBe(75025)
6db75ad9 71 result = await pool.execute({
dbca3be9 72 function: TaskFunctions.factorial
6db75ad9 73 })
70a4f5ea 74 expect(result).toBe(9.33262154439441e157)
506c2a14 75 })
76
318d4156 77 it('Verify that is possible to invoke the execute() method without input', async () => {
106744f7 78 const result = await pool.execute()
30b963d4 79 expect(result).toStrictEqual({ ok: 1 })
106744f7 80 })
81
1dcbfefc 82 it("Verify that 'ready' event is emitted", async () => {
0fe39c97 83 const pool = new FixedThreadPool(
079de991 84 numberOfThreads,
b2fd3f4a 85 './tests/worker-files/thread/testWorker.mjs',
079de991 86 {
041dc05b 87 errorHandler: e => console.error(e)
079de991
JB
88 }
89 )
c726f66c 90 expect(pool.emitter.eventNames()).toStrictEqual([])
079de991 91 let poolReady = 0
0fe39c97
JB
92 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
93 await waitPoolEvents(pool, PoolEvents.ready, 1)
c726f66c 94 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
216541b6 95 expect(poolReady).toBe(1)
0fe39c97 96 await pool.destroy()
216541b6
JB
97 })
98
94407def
JB
99 it("Verify that 'busy' event is emitted", async () => {
100 const promises = new Set()
c726f66c 101 expect(pool.emitter.eventNames()).toStrictEqual([])
7c0ba920 102 let poolBusy = 0
aee46736 103 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
c726f66c 104 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
7c0ba920 105 for (let i = 0; i < numberOfThreads * 2; i++) {
94407def 106 promises.add(pool.execute())
7c0ba920 107 }
94407def 108 await Promise.all(promises)
14916bf9
JB
109 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
110 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
111 expect(poolBusy).toBe(numberOfThreads + 1)
7c0ba920
JB
112 })
113
594bfb84 114 it('Verify that tasks queuing is working', async () => {
d3d4b67d 115 const promises = new Set()
4e377863 116 const maxMultiplier = 3 // Must be greater than tasksConcurrency
594bfb84 117 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
d3d4b67d 118 promises.add(queuePool.execute())
594bfb84 119 }
d3d4b67d 120 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
594bfb84 121 for (const workerNode of queuePool.workerNodes) {
5bb5be17 122 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
f59e1027 123 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
124 queuePool.opts.tasksQueueOptions.concurrency
125 )
f59e1027 126 expect(workerNode.usage.tasks.executed).toBe(0)
4e377863
JB
127 expect(workerNode.usage.tasks.queued).toBe(
128 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
129 )
130 expect(workerNode.usage.tasks.maxQueued).toBe(
131 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
132 )
1f0766e7 133 expect(workerNode.usage.tasks.stolen).toBe(0)
594bfb84 134 }
1f0766e7 135 expect(queuePool.info.executedTasks).toBe(0)
4e377863
JB
136 expect(queuePool.info.executingTasks).toBe(
137 numberOfThreads * queuePool.opts.tasksQueueOptions.concurrency
138 )
6b27d407 139 expect(queuePool.info.queuedTasks).toBe(
4e377863
JB
140 numberOfThreads *
141 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
6b27d407
JB
142 )
143 expect(queuePool.info.maxQueuedTasks).toBe(
4e377863
JB
144 numberOfThreads *
145 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
d3d4b67d 146 )
a1763c54 147 expect(queuePool.info.backPressure).toBe(false)
1f0766e7 148 expect(queuePool.info.stolenTasks).toBe(0)
594bfb84
JB
149 await Promise.all(promises)
150 for (const workerNode of queuePool.workerNodes) {
a6b3272b
JB
151 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
152 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
153 numberOfThreads * maxMultiplier
154 )
4e377863 155 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
f59e1027 156 expect(workerNode.usage.tasks.queued).toBe(0)
4e377863
JB
157 expect(workerNode.usage.tasks.maxQueued).toBe(
158 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
159 )
1f0766e7
JB
160 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
161 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
162 numberOfThreads * maxMultiplier
163 )
594bfb84 164 }
1f0766e7
JB
165 expect(queuePool.info.executedTasks).toBe(numberOfThreads * maxMultiplier)
166 expect(queuePool.info.backPressure).toBe(false)
167 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
168 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
169 numberOfThreads * maxMultiplier
170 )
594bfb84
JB
171 })
172
106744f7 173 it('Verify that is possible to have a worker that return undefined', async () => {
174 const result = await emptyPool.execute()
6db75ad9 175 expect(result).toBeUndefined()
106744f7 176 })
177
178 it('Verify that data are sent to the worker correctly', async () => {
179 const data = { f: 10 }
180 const result = await echoPool.execute(data)
e1ffb94f 181 expect(result).toStrictEqual(data)
106744f7 182 })
183
9ea61037 184 it('Verify that transferable objects are sent to the worker correctly', async () => {
9ea61037
JB
185 let error
186 let result
187 try {
f05314ff
JB
188 result = await pool.execute(undefined, undefined, [
189 new ArrayBuffer(16),
190 new MessageChannel().port1
191 ])
9ea61037
JB
192 } catch (e) {
193 error = e
194 }
195 expect(result).toStrictEqual({ ok: 1 })
196 expect(error).toBeUndefined()
f05314ff
JB
197 try {
198 result = await pool.execute(undefined, undefined, [
199 new SharedArrayBuffer(16)
200 ])
201 } catch (e) {
202 error = e
203 }
204 expect(result).toStrictEqual({ ok: 1 })
205 expect(error).toStrictEqual(
206 new TypeError('Found invalid object in transferList')
207 )
9ea61037
JB
208 })
209
7c0ba920 210 it('Verify that error handling is working properly:sync', async () => {
106744f7 211 const data = { f: 10 }
c726f66c 212 expect(errorPool.emitter.eventNames()).toStrictEqual([])
d46660cd 213 let taskError
041dc05b 214 errorPool.emitter.on(PoolEvents.taskError, e => {
d46660cd
JB
215 taskError = e
216 })
c726f66c 217 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
106744f7 218 let inError
219 try {
220 await errorPool.execute(data)
221 } catch (e) {
222 inError = e
223 }
7c0ba920
JB
224 expect(inError).toBeDefined()
225 expect(inError).toBeInstanceOf(Error)
226 expect(inError.message).toBeDefined()
8620fb25 227 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
228 expect(inError.message).toBe('Error Message from ThreadWorker')
229 expect(taskError).toStrictEqual({
6cd5248f 230 name: DEFAULT_TASK_NAME,
985d0e79
JB
231 message: new Error('Error Message from ThreadWorker'),
232 data
233 })
18482cec
JB
234 expect(
235 errorPool.workerNodes.some(
041dc05b 236 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
237 )
238 ).toBe(true)
7c0ba920
JB
239 })
240
241 it('Verify that error handling is working properly:async', async () => {
242 const data = { f: 10 }
c726f66c 243 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
4f0b85b3 244 let taskError
041dc05b 245 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
4f0b85b3
JB
246 taskError = e
247 })
c726f66c
JB
248 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
249 PoolEvents.taskError
250 ])
7c0ba920
JB
251 let inError
252 try {
253 await asyncErrorPool.execute(data)
254 } catch (e) {
255 inError = e
256 }
257 expect(inError).toBeDefined()
258 expect(inError).toBeInstanceOf(Error)
259 expect(inError.message).toBeDefined()
8620fb25 260 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
261 expect(inError.message).toBe('Error Message from ThreadWorker:async')
262 expect(taskError).toStrictEqual({
6cd5248f 263 name: DEFAULT_TASK_NAME,
985d0e79
JB
264 message: new Error('Error Message from ThreadWorker:async'),
265 data
266 })
18482cec
JB
267 expect(
268 asyncErrorPool.workerNodes.some(
041dc05b 269 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
270 )
271 ).toBe(true)
106744f7 272 })
273
7784f548 274 it('Verify that async function is working properly', async () => {
275 const data = { f: 10 }
15e5141f 276 const startTime = performance.now()
7784f548 277 const result = await asyncPool.execute(data)
15e5141f 278 const usedTime = performance.now() - startTime
e1ffb94f 279 expect(result).toStrictEqual(data)
32d490eb 280 expect(usedTime).toBeGreaterThanOrEqual(2000)
7784f548 281 })
282
506c2a14 283 it('Shutdown test', async () => {
bac873bd 284 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
b04bd0f1 285 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
ef3891a3
JB
286 let poolDestroy = 0
287 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
b04bd0f1
JB
288 expect(pool.emitter.eventNames()).toStrictEqual([
289 PoolEvents.busy,
290 PoolEvents.destroy
291 ])
1f9a5a44 292 await pool.destroy()
bdacc2d2 293 const numberOfExitEvents = await exitPromise
bb9423b7
JB
294 expect(pool.started).toBe(false)
295 expect(pool.workerNodes.length).toBe(0)
bdacc2d2 296 expect(numberOfExitEvents).toBe(numberOfThreads)
ef3891a3 297 expect(poolDestroy).toBe(1)
506c2a14 298 })
299
90082c8c 300 it('Verify that thread pool options are checked', async () => {
b2fd3f4a 301 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
0fe39c97
JB
302 let pool = new FixedThreadPool(numberOfThreads, workerFilePath)
303 expect(pool.opts.workerOptions).toBeUndefined()
304 await pool.destroy()
305 pool = new FixedThreadPool(numberOfThreads, workerFilePath, {
90082c8c
JB
306 workerOptions: {
307 env: { TEST: 'test' },
308 name: 'test'
309 }
310 })
0fe39c97 311 expect(pool.opts.workerOptions).toStrictEqual({
90082c8c
JB
312 env: { TEST: 'test' },
313 name: 'test'
314 })
0fe39c97 315 await pool.destroy()
90082c8c
JB
316 })
317
506c2a14 318 it('Should work even without opts in input', async () => {
b2fd3f4a 319 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
7d27ec0d 320 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
0fe39c97 321 const res = await pool.execute()
30b963d4 322 expect(res).toStrictEqual({ ok: 1 })
0e2503fc 323 // We need to clean up the resources after our test
0fe39c97 324 await pool.destroy()
506c2a14 325 })
8d3782fa 326
7d27ec0d 327 it('Verify destroyWorkerNode()', async () => {
b2fd3f4a 328 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
7d27ec0d 329 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
52a23942 330 const workerNodeKey = 0
7d27ec0d 331 let exitEvent = 0
52a23942 332 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
7d27ec0d
JB
333 ++exitEvent
334 })
52a23942 335 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
7d27ec0d
JB
336 expect(exitEvent).toBe(1)
337 expect(pool.workerNodes.length).toBe(numberOfThreads - 1)
338 await pool.destroy()
339 })
340
8d3782fa
JB
341 it('Verify that a pool with zero worker fails', async () => {
342 expect(
b2fd3f4a 343 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.mjs')
2431bdb4 344 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 345 })
506c2a14 346})