901c6665c7d0086bdf453466a7c74027a2c0e6d6
[poolifier.git] / tests / pools / thread / fixed.test.js
1 const { expect } = require('expect')
2 const { FixedThreadPool, PoolEvents } = require('../../../lib')
3 const { TaskFunctions } = require('../../test-types')
4 const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
5 const { DEFAULT_TASK_NAME } = require('../../../lib/utils')
6
7 describe('Fixed thread pool test suite', () => {
8 const numberOfThreads = 6
9 const tasksConcurrency = 2
10 const pool = new FixedThreadPool(
11 numberOfThreads,
12 './tests/worker-files/thread/testWorker.js',
13 {
14 errorHandler: (e) => console.error(e)
15 }
16 )
17 const queuePool = new FixedThreadPool(
18 numberOfThreads,
19 './tests/worker-files/thread/testWorker.js',
20 {
21 enableTasksQueue: true,
22 tasksQueueOptions: {
23 concurrency: tasksConcurrency
24 },
25 errorHandler: (e) => console.error(e)
26 }
27 )
28 const emptyPool = new FixedThreadPool(
29 numberOfThreads,
30 './tests/worker-files/thread/emptyWorker.js',
31 { exitHandler: () => console.info('empty pool worker exited') }
32 )
33 const echoPool = new FixedThreadPool(
34 numberOfThreads,
35 './tests/worker-files/thread/echoWorker.js'
36 )
37 const errorPool = new FixedThreadPool(
38 numberOfThreads,
39 './tests/worker-files/thread/errorWorker.js',
40 {
41 errorHandler: (e) => console.error(e)
42 }
43 )
44 const asyncErrorPool = new FixedThreadPool(
45 numberOfThreads,
46 './tests/worker-files/thread/asyncErrorWorker.js',
47 {
48 errorHandler: (e) => console.error(e)
49 }
50 )
51 const asyncPool = new FixedThreadPool(
52 numberOfThreads,
53 './tests/worker-files/thread/asyncWorker.js'
54 )
55
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool.destroy()
59 await asyncPool.destroy()
60 await errorPool.destroy()
61 await asyncErrorPool.destroy()
62 await emptyPool.destroy()
63 await queuePool.destroy()
64 })
65
66 it('Verify that the function is executed in a worker thread', async () => {
67 let result = await pool.execute({
68 function: TaskFunctions.fibonacci
69 })
70 expect(result).toBe(75025)
71 result = await pool.execute({
72 function: TaskFunctions.factorial
73 })
74 expect(result).toBe(9.33262154439441e157)
75 })
76
77 it('Verify that is possible to invoke the execute() method without input', async () => {
78 const result = await pool.execute()
79 expect(result).toStrictEqual({ ok: 1 })
80 })
81
82 it("Verify that 'ready' event is emitted", async () => {
83 const pool = new FixedThreadPool(
84 numberOfThreads,
85 './tests/worker-files/thread/testWorker.js',
86 {
87 errorHandler: (e) => console.error(e)
88 }
89 )
90 let poolReady = 0
91 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
92 await waitPoolEvents(pool, PoolEvents.ready, 1)
93 expect(poolReady).toBe(1)
94 await pool.destroy()
95 })
96
97 it("Verify that 'busy' event is emitted", async () => {
98 const promises = new Set()
99 let poolBusy = 0
100 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
101 for (let i = 0; i < numberOfThreads * 2; i++) {
102 promises.add(pool.execute())
103 }
104 await Promise.all(promises)
105 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
106 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
107 expect(poolBusy).toBe(numberOfThreads + 1)
108 })
109
110 it('Verify that tasks queuing is working', async () => {
111 const promises = new Set()
112 const maxMultiplier = 3 // Must be greater than tasksConcurrency
113 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
114 promises.add(queuePool.execute())
115 }
116 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
117 for (const workerNode of queuePool.workerNodes) {
118 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
119 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
120 queuePool.opts.tasksQueueOptions.concurrency
121 )
122 expect(workerNode.usage.tasks.executed).toBe(0)
123 expect(workerNode.usage.tasks.queued).toBe(
124 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
125 )
126 expect(workerNode.usage.tasks.maxQueued).toBe(
127 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
128 )
129 expect(workerNode.usage.tasks.stolen).toBe(0)
130 }
131 expect(queuePool.info.executedTasks).toBe(0)
132 expect(queuePool.info.executingTasks).toBe(
133 numberOfThreads * queuePool.opts.tasksQueueOptions.concurrency
134 )
135 expect(queuePool.info.queuedTasks).toBe(
136 numberOfThreads *
137 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
138 )
139 expect(queuePool.info.maxQueuedTasks).toBe(
140 numberOfThreads *
141 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
142 )
143 expect(queuePool.info.backPressure).toBe(false)
144 expect(queuePool.info.stolenTasks).toBe(0)
145 await Promise.all(promises)
146 for (const workerNode of queuePool.workerNodes) {
147 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
148 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
149 numberOfThreads * maxMultiplier
150 )
151 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
152 expect(workerNode.usage.tasks.queued).toBe(0)
153 expect(workerNode.usage.tasks.maxQueued).toBe(
154 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
155 )
156 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
157 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
158 numberOfThreads * maxMultiplier
159 )
160 }
161 expect(queuePool.info.executedTasks).toBe(numberOfThreads * maxMultiplier)
162 expect(queuePool.info.backPressure).toBe(false)
163 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
164 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
165 numberOfThreads * maxMultiplier
166 )
167 })
168
169 it('Verify that is possible to have a worker that return undefined', async () => {
170 const result = await emptyPool.execute()
171 expect(result).toBeUndefined()
172 })
173
174 it('Verify that data are sent to the worker correctly', async () => {
175 const data = { f: 10 }
176 const result = await echoPool.execute(data)
177 expect(result).toStrictEqual(data)
178 })
179
180 it('Verify that transferable objects are sent to the worker correctly', async () => {
181 let error
182 let result
183 try {
184 result = await pool.execute(undefined, undefined, [
185 new ArrayBuffer(16),
186 new MessageChannel().port1
187 ])
188 } catch (e) {
189 error = e
190 }
191 expect(result).toStrictEqual({ ok: 1 })
192 expect(error).toBeUndefined()
193 try {
194 result = await pool.execute(undefined, undefined, [
195 new SharedArrayBuffer(16)
196 ])
197 } catch (e) {
198 error = e
199 }
200 expect(result).toStrictEqual({ ok: 1 })
201 expect(error).toStrictEqual(
202 new TypeError('Found invalid object in transferList')
203 )
204 })
205
206 it('Verify that error handling is working properly:sync', async () => {
207 const data = { f: 10 }
208 let taskError
209 errorPool.emitter.on(PoolEvents.taskError, (e) => {
210 taskError = e
211 })
212 let inError
213 try {
214 await errorPool.execute(data)
215 } catch (e) {
216 inError = e
217 }
218 expect(inError).toBeDefined()
219 expect(inError).toBeInstanceOf(Error)
220 expect(inError.message).toBeDefined()
221 expect(typeof inError.message === 'string').toBe(true)
222 expect(inError.message).toBe('Error Message from ThreadWorker')
223 expect(taskError).toStrictEqual({
224 name: DEFAULT_TASK_NAME,
225 message: new Error('Error Message from ThreadWorker'),
226 data
227 })
228 expect(
229 errorPool.workerNodes.some(
230 (workerNode) => workerNode.usage.tasks.failed === 1
231 )
232 ).toBe(true)
233 })
234
235 it('Verify that error handling is working properly:async', async () => {
236 const data = { f: 10 }
237 let taskError
238 asyncErrorPool.emitter.on(PoolEvents.taskError, (e) => {
239 taskError = e
240 })
241 let inError
242 try {
243 await asyncErrorPool.execute(data)
244 } catch (e) {
245 inError = e
246 }
247 expect(inError).toBeDefined()
248 expect(inError).toBeInstanceOf(Error)
249 expect(inError.message).toBeDefined()
250 expect(typeof inError.message === 'string').toBe(true)
251 expect(inError.message).toBe('Error Message from ThreadWorker:async')
252 expect(taskError).toStrictEqual({
253 name: DEFAULT_TASK_NAME,
254 message: new Error('Error Message from ThreadWorker:async'),
255 data
256 })
257 expect(
258 asyncErrorPool.workerNodes.some(
259 (workerNode) => workerNode.usage.tasks.failed === 1
260 )
261 ).toBe(true)
262 })
263
264 it('Verify that async function is working properly', async () => {
265 const data = { f: 10 }
266 const startTime = performance.now()
267 const result = await asyncPool.execute(data)
268 const usedTime = performance.now() - startTime
269 expect(result).toStrictEqual(data)
270 expect(usedTime).toBeGreaterThanOrEqual(2000)
271 })
272
273 it('Shutdown test', async () => {
274 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
275 let poolDestroy = 0
276 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
277 await pool.destroy()
278 const numberOfExitEvents = await exitPromise
279 expect(pool.started).toBe(false)
280 expect(pool.workerNodes.length).toBe(0)
281 expect(numberOfExitEvents).toBe(numberOfThreads)
282 expect(poolDestroy).toBe(1)
283 })
284
285 it('Verify that thread pool options are checked', async () => {
286 const workerFilePath = './tests/worker-files/thread/testWorker.js'
287 let pool = new FixedThreadPool(numberOfThreads, workerFilePath)
288 expect(pool.opts.workerOptions).toBeUndefined()
289 await pool.destroy()
290 pool = new FixedThreadPool(numberOfThreads, workerFilePath, {
291 workerOptions: {
292 env: { TEST: 'test' },
293 name: 'test'
294 }
295 })
296 expect(pool.opts.workerOptions).toStrictEqual({
297 env: { TEST: 'test' },
298 name: 'test'
299 })
300 await pool.destroy()
301 })
302
303 it('Should work even without opts in input', async () => {
304 const workerFilePath = './tests/worker-files/thread/testWorker.js'
305 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
306 const res = await pool.execute()
307 expect(res).toStrictEqual({ ok: 1 })
308 // We need to clean up the resources after our test
309 await pool.destroy()
310 })
311
312 it('Verify destroyWorkerNode()', async () => {
313 const workerFilePath = './tests/worker-files/thread/testWorker.js'
314 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
315 const workerNodeKey = 0
316 let exitEvent = 0
317 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
318 ++exitEvent
319 })
320 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
321 expect(exitEvent).toBe(1)
322 expect(pool.workerNodes.length).toBe(numberOfThreads - 1)
323 await pool.destroy()
324 })
325
326 it('Verify that a pool with zero worker fails', async () => {
327 expect(
328 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
329 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
330 })
331 })