3da05735588953cf6c0882a30aa86a231dd8b841
[poolifier.git] / tests / pools / thread / fixed.test.mjs
1 import { expect } from 'expect'
2
3 import { FixedThreadPool, PoolEvents } from '../../../lib/index.cjs'
4 import { DEFAULT_TASK_NAME } from '../../../lib/utils.cjs'
5 import { TaskFunctions } from '../../test-types.cjs'
6 import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
7
8 describe('Fixed thread pool test suite', () => {
9 const numberOfThreads = 6
10 const tasksConcurrency = 2
11 const pool = new FixedThreadPool(
12 numberOfThreads,
13 './tests/worker-files/thread/testWorker.mjs',
14 {
15 errorHandler: e => console.error(e)
16 }
17 )
18 const queuePool = new FixedThreadPool(
19 numberOfThreads,
20 './tests/worker-files/thread/testWorker.mjs',
21 {
22 enableTasksQueue: true,
23 tasksQueueOptions: {
24 concurrency: tasksConcurrency
25 },
26 errorHandler: e => console.error(e)
27 }
28 )
29 const emptyPool = new FixedThreadPool(
30 numberOfThreads,
31 './tests/worker-files/thread/emptyWorker.mjs',
32 { exitHandler: () => console.info('empty pool worker exited') }
33 )
34 const echoPool = new FixedThreadPool(
35 numberOfThreads,
36 './tests/worker-files/thread/echoWorker.mjs'
37 )
38 const errorPool = new FixedThreadPool(
39 numberOfThreads,
40 './tests/worker-files/thread/errorWorker.mjs',
41 {
42 errorHandler: e => console.error(e)
43 }
44 )
45 const asyncErrorPool = new FixedThreadPool(
46 numberOfThreads,
47 './tests/worker-files/thread/asyncErrorWorker.mjs',
48 {
49 errorHandler: e => console.error(e)
50 }
51 )
52 const asyncPool = new FixedThreadPool(
53 numberOfThreads,
54 './tests/worker-files/thread/asyncWorker.mjs'
55 )
56
57 after('Destroy all pools', async () => {
58 // We need to clean up the resources after our test
59 await echoPool.destroy()
60 await asyncPool.destroy()
61 await errorPool.destroy()
62 await asyncErrorPool.destroy()
63 await emptyPool.destroy()
64 await queuePool.destroy()
65 })
66
67 it('Verify that the function is executed in a worker thread', async () => {
68 let result = await pool.execute({
69 function: TaskFunctions.fibonacci
70 })
71 expect(result).toBe(354224848179262000000)
72 result = await pool.execute({
73 function: TaskFunctions.factorial
74 })
75 expect(result).toBe(9.33262154439441e157)
76 })
77
78 it('Verify that is possible to invoke the execute() method without input', async () => {
79 const result = await pool.execute()
80 expect(result).toStrictEqual({ ok: 1 })
81 })
82
83 it("Verify that 'ready' event is emitted", async () => {
84 const pool = new FixedThreadPool(
85 numberOfThreads,
86 './tests/worker-files/thread/testWorker.mjs',
87 {
88 errorHandler: e => console.error(e)
89 }
90 )
91 expect(pool.emitter.eventNames()).toStrictEqual([])
92 let poolReady = 0
93 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
94 await waitPoolEvents(pool, PoolEvents.ready, 1)
95 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
96 expect(poolReady).toBe(1)
97 await pool.destroy()
98 })
99
100 it("Verify that 'busy' event is emitted", async () => {
101 const promises = new Set()
102 expect(pool.emitter.eventNames()).toStrictEqual([])
103 let poolBusy = 0
104 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
105 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
106 for (let i = 0; i < numberOfThreads * 2; i++) {
107 promises.add(pool.execute())
108 }
109 await Promise.all(promises)
110 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
111 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
112 expect(poolBusy).toBe(numberOfThreads + 1)
113 })
114
115 it('Verify that tasks queuing is working', async () => {
116 const promises = new Set()
117 const maxMultiplier = 3 // Must be greater than tasksConcurrency
118 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
119 promises.add(queuePool.execute())
120 }
121 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
122 for (const workerNode of queuePool.workerNodes) {
123 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
124 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
125 queuePool.opts.tasksQueueOptions.concurrency
126 )
127 expect(workerNode.usage.tasks.executed).toBe(0)
128 expect(workerNode.usage.tasks.queued).toBe(
129 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
130 )
131 expect(workerNode.usage.tasks.maxQueued).toBe(
132 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
133 )
134 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
135 expect(workerNode.usage.tasks.stolen).toBe(0)
136 }
137 expect(queuePool.info.executedTasks).toBe(0)
138 expect(queuePool.info.executingTasks).toBe(
139 numberOfThreads * queuePool.opts.tasksQueueOptions.concurrency
140 )
141 expect(queuePool.info.queuedTasks).toBe(
142 numberOfThreads *
143 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
144 )
145 expect(queuePool.info.maxQueuedTasks).toBe(
146 numberOfThreads *
147 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
148 )
149 expect(queuePool.info.backPressure).toBe(false)
150 expect(queuePool.info.stolenTasks).toBe(0)
151 await Promise.all(promises)
152 for (const workerNode of queuePool.workerNodes) {
153 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
154 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
155 numberOfThreads * maxMultiplier
156 )
157 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
158 expect(workerNode.usage.tasks.queued).toBe(0)
159 expect(workerNode.usage.tasks.maxQueued).toBe(
160 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
161 )
162 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
163 0
164 )
165 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
166 numberOfThreads * maxMultiplier
167 )
168 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
169 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
170 numberOfThreads * maxMultiplier
171 )
172 }
173 expect(queuePool.info.executedTasks).toBe(numberOfThreads * maxMultiplier)
174 expect(queuePool.info.backPressure).toBe(false)
175 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
176 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
177 numberOfThreads * maxMultiplier
178 )
179 })
180
181 it('Verify that is possible to have a worker that return undefined', async () => {
182 const result = await emptyPool.execute()
183 expect(result).toBeUndefined()
184 })
185
186 it('Verify that data are sent to the worker correctly', async () => {
187 const data = { f: 10 }
188 const result = await echoPool.execute(data)
189 expect(result).toStrictEqual(data)
190 })
191
192 it('Verify that transferable objects are sent to the worker correctly', async () => {
193 let error
194 let result
195 try {
196 result = await pool.execute(undefined, undefined, [
197 new ArrayBuffer(16),
198 new MessageChannel().port1
199 ])
200 } catch (e) {
201 error = e
202 }
203 expect(result).toStrictEqual({ ok: 1 })
204 expect(error).toBeUndefined()
205 try {
206 result = await pool.execute(undefined, undefined, [
207 new SharedArrayBuffer(16)
208 ])
209 } catch (e) {
210 error = e
211 }
212 expect(result).toStrictEqual({ ok: 1 })
213 expect(error).toBeInstanceOf(Error)
214 expect(error.message).toMatch(
215 /Found invalid (object|value) in transferList/
216 )
217 })
218
219 it('Verify that error handling is working properly:sync', async () => {
220 const data = { f: 10 }
221 expect(errorPool.emitter.eventNames()).toStrictEqual([])
222 let taskError
223 errorPool.emitter.on(PoolEvents.taskError, e => {
224 taskError = e
225 })
226 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
227 let inError
228 try {
229 await errorPool.execute(data)
230 } catch (e) {
231 inError = e
232 }
233 expect(inError).toBeDefined()
234 expect(inError).toBeInstanceOf(Error)
235 expect(inError.message).toBeDefined()
236 expect(typeof inError.message === 'string').toBe(true)
237 expect(inError.message).toBe('Error Message from ThreadWorker')
238 expect(taskError).toStrictEqual({
239 name: DEFAULT_TASK_NAME,
240 message: new Error('Error Message from ThreadWorker'),
241 data
242 })
243 expect(
244 errorPool.workerNodes.some(
245 workerNode => workerNode.usage.tasks.failed === 1
246 )
247 ).toBe(true)
248 })
249
250 it('Verify that error handling is working properly:async', async () => {
251 const data = { f: 10 }
252 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
253 let taskError
254 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
255 taskError = e
256 })
257 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
258 PoolEvents.taskError
259 ])
260 let inError
261 try {
262 await asyncErrorPool.execute(data)
263 } catch (e) {
264 inError = e
265 }
266 expect(inError).toBeDefined()
267 expect(inError).toBeInstanceOf(Error)
268 expect(inError.message).toBeDefined()
269 expect(typeof inError.message === 'string').toBe(true)
270 expect(inError.message).toBe('Error Message from ThreadWorker:async')
271 expect(taskError).toStrictEqual({
272 name: DEFAULT_TASK_NAME,
273 message: new Error('Error Message from ThreadWorker:async'),
274 data
275 })
276 expect(
277 asyncErrorPool.workerNodes.some(
278 workerNode => workerNode.usage.tasks.failed === 1
279 )
280 ).toBe(true)
281 })
282
283 it('Verify that async function is working properly', async () => {
284 const data = { f: 10 }
285 const startTime = performance.now()
286 const result = await asyncPool.execute(data)
287 const usedTime = performance.now() - startTime
288 expect(result).toStrictEqual(data)
289 expect(usedTime).toBeGreaterThanOrEqual(2000)
290 })
291
292 it('Shutdown test', async () => {
293 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
294 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
295 let poolDestroy = 0
296 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
297 expect(pool.emitter.eventNames()).toStrictEqual([
298 PoolEvents.busy,
299 PoolEvents.destroy
300 ])
301 await pool.destroy()
302 const numberOfExitEvents = await exitPromise
303 expect(pool.started).toBe(false)
304 expect(pool.emitter.eventNames()).toStrictEqual([
305 PoolEvents.busy,
306 PoolEvents.destroy
307 ])
308 expect(pool.readyEventEmitted).toBe(false)
309 expect(pool.workerNodes.length).toBe(0)
310 expect(numberOfExitEvents).toBe(numberOfThreads)
311 expect(poolDestroy).toBe(1)
312 })
313
314 it('Verify that thread pool options are checked', async () => {
315 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
316 let pool = new FixedThreadPool(numberOfThreads, workerFilePath)
317 expect(pool.opts.workerOptions).toBeUndefined()
318 await pool.destroy()
319 pool = new FixedThreadPool(numberOfThreads, workerFilePath, {
320 workerOptions: {
321 env: { TEST: 'test' },
322 name: 'test'
323 }
324 })
325 expect(pool.opts.workerOptions).toStrictEqual({
326 env: { TEST: 'test' },
327 name: 'test'
328 })
329 await pool.destroy()
330 })
331
332 it('Should work even without opts in input', async () => {
333 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
334 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
335 const res = await pool.execute()
336 expect(res).toStrictEqual({ ok: 1 })
337 // We need to clean up the resources after our test
338 await pool.destroy()
339 })
340
341 it('Verify destroyWorkerNode()', async () => {
342 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
343 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
344 const workerNodeKey = 0
345 let exitEvent = 0
346 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
347 ++exitEvent
348 })
349 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
350 expect(exitEvent).toBe(1)
351 // Simulates an illegitimate worker node destroy and the minimum number of worker nodes is guaranteed
352 expect(pool.workerNodes.length).toBe(numberOfThreads)
353 await pool.destroy()
354 })
355
356 it('Verify that a pool with zero worker fails', () => {
357 expect(
358 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.mjs')
359 ).toThrow('Cannot instantiate a fixed pool with zero worker')
360 })
361 })