8b2fe42047bc4c91b2a8d01599842878bd9950e5
[poolifier.git] / tests / pools / thread / fixed.test.js
1 const { expect } = require('expect')
2 const { FixedThreadPool, PoolEvents } = require('../../../lib')
3 const { WorkerFunctions } = require('../../test-types')
4 const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
5
6 describe('Fixed thread pool test suite', async () => {
7 const numberOfThreads = 6
8 const pool = new FixedThreadPool(
9 numberOfThreads,
10 './tests/worker-files/thread/testWorker.js',
11 {
12 errorHandler: e => console.error(e)
13 }
14 )
15 let poolReady = 0
16 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
17 await waitPoolEvents(pool, PoolEvents.ready, 1)
18 const queuePool = new FixedThreadPool(
19 numberOfThreads,
20 './tests/worker-files/thread/testWorker.js',
21 {
22 enableTasksQueue: true,
23 tasksQueueOptions: {
24 concurrency: 2
25 },
26 errorHandler: e => console.error(e)
27 }
28 )
29 const emptyPool = new FixedThreadPool(
30 numberOfThreads,
31 './tests/worker-files/thread/emptyWorker.js',
32 { exitHandler: () => console.log('empty pool worker exited') }
33 )
34 const echoPool = new FixedThreadPool(
35 numberOfThreads,
36 './tests/worker-files/thread/echoWorker.js'
37 )
38 const errorPool = new FixedThreadPool(
39 numberOfThreads,
40 './tests/worker-files/thread/errorWorker.js',
41 {
42 errorHandler: e => console.error(e)
43 }
44 )
45 const asyncErrorPool = new FixedThreadPool(
46 numberOfThreads,
47 './tests/worker-files/thread/asyncErrorWorker.js',
48 {
49 errorHandler: e => console.error(e)
50 }
51 )
52 const asyncPool = new FixedThreadPool(
53 numberOfThreads,
54 './tests/worker-files/thread/asyncWorker.js'
55 )
56
57 after('Destroy all pools', async () => {
58 // We need to clean up the resources after our test
59 await echoPool.destroy()
60 await asyncPool.destroy()
61 await errorPool.destroy()
62 await asyncErrorPool.destroy()
63 await emptyPool.destroy()
64 await queuePool.destroy()
65 })
66
67 it('Verify that the function is executed in a worker thread', async () => {
68 let result = await pool.execute({
69 function: WorkerFunctions.fibonacci
70 })
71 expect(result).toBe(75025)
72 result = await pool.execute({
73 function: WorkerFunctions.factorial
74 })
75 expect(result).toBe(9.33262154439441e157)
76 })
77
78 it('Verify that is possible to invoke the execute() method without input', async () => {
79 const result = await pool.execute()
80 expect(result).toStrictEqual({ ok: 1 })
81 })
82
83 it("Verify that 'ready' event is emitted", async () => {
84 expect(poolReady).toBe(1)
85 })
86
87 it("Verify that 'busy' event is emitted", async () => {
88 let poolBusy = 0
89 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
90 for (let i = 0; i < numberOfThreads * 2; i++) {
91 pool.execute()
92 }
93 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
94 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
95 expect(poolBusy).toBe(numberOfThreads + 1)
96 })
97
98 it('Verify that tasks queuing is working', async () => {
99 const promises = new Set()
100 const maxMultiplier = 2
101 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
102 promises.add(queuePool.execute())
103 }
104 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
105 for (const workerNode of queuePool.workerNodes) {
106 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
107 queuePool.opts.tasksQueueOptions.concurrency
108 )
109 expect(workerNode.usage.tasks.executed).toBe(0)
110 expect(workerNode.usage.tasks.queued).toBeGreaterThan(0)
111 expect(workerNode.usage.tasks.maxQueued).toBeGreaterThan(0)
112 }
113 expect(queuePool.info.executingTasks).toBe(numberOfThreads)
114 expect(queuePool.info.queuedTasks).toBe(
115 numberOfThreads * maxMultiplier - numberOfThreads
116 )
117 expect(queuePool.info.maxQueuedTasks).toBe(
118 numberOfThreads * maxMultiplier - numberOfThreads
119 )
120 await Promise.all(promises)
121 for (const workerNode of queuePool.workerNodes) {
122 expect(workerNode.usage.tasks.executing).toBe(0)
123 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
124 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
125 expect(workerNode.usage.tasks.queued).toBe(0)
126 expect(workerNode.usage.tasks.maxQueued).toBe(1)
127 }
128 })
129
130 it('Verify that is possible to have a worker that return undefined', async () => {
131 const result = await emptyPool.execute()
132 expect(result).toBeUndefined()
133 })
134
135 it('Verify that data are sent to the worker correctly', async () => {
136 const data = { f: 10 }
137 const result = await echoPool.execute(data)
138 expect(result).toStrictEqual(data)
139 })
140
141 it('Verify that error handling is working properly:sync', async () => {
142 const data = { f: 10 }
143 let taskError
144 errorPool.emitter.on(PoolEvents.taskError, e => {
145 taskError = e
146 })
147 let inError
148 try {
149 await errorPool.execute(data)
150 } catch (e) {
151 inError = e
152 }
153 expect(inError).toBeDefined()
154 expect(inError).toBeInstanceOf(Error)
155 expect(inError.message).toBeDefined()
156 expect(typeof inError.message === 'string').toBe(true)
157 expect(inError.message).toBe('Error Message from ThreadWorker')
158 expect(taskError).toStrictEqual({
159 message: new Error('Error Message from ThreadWorker'),
160 data
161 })
162 expect(
163 errorPool.workerNodes.some(
164 workerNode => workerNode.usage.tasks.failed === 1
165 )
166 ).toBe(true)
167 })
168
169 it('Verify that error handling is working properly:async', async () => {
170 const data = { f: 10 }
171 let taskError
172 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
173 taskError = e
174 })
175 let inError
176 try {
177 await asyncErrorPool.execute(data)
178 } catch (e) {
179 inError = e
180 }
181 expect(inError).toBeDefined()
182 expect(inError).toBeInstanceOf(Error)
183 expect(inError.message).toBeDefined()
184 expect(typeof inError.message === 'string').toBe(true)
185 expect(inError.message).toBe('Error Message from ThreadWorker:async')
186 expect(taskError).toStrictEqual({
187 message: new Error('Error Message from ThreadWorker:async'),
188 data
189 })
190 expect(
191 asyncErrorPool.workerNodes.some(
192 workerNode => workerNode.usage.tasks.failed === 1
193 )
194 ).toBe(true)
195 })
196
197 it('Verify that async function is working properly', async () => {
198 const data = { f: 10 }
199 const startTime = performance.now()
200 const result = await asyncPool.execute(data)
201 const usedTime = performance.now() - startTime
202 expect(result).toStrictEqual(data)
203 expect(usedTime).toBeGreaterThanOrEqual(2000)
204 })
205
206 it('Shutdown test', async () => {
207 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
208 await pool.destroy()
209 const numberOfExitEvents = await exitPromise
210 expect(numberOfExitEvents).toBe(numberOfThreads)
211 })
212
213 it('Verify that thread pool options are checked', async () => {
214 const workerFilePath = './tests/worker-files/thread/testWorker.js'
215 let pool1 = new FixedThreadPool(numberOfThreads, workerFilePath)
216 expect(pool1.opts.workerOptions).toBeUndefined()
217 await pool1.destroy()
218 pool1 = new FixedThreadPool(numberOfThreads, workerFilePath, {
219 workerOptions: {
220 env: { TEST: 'test' },
221 name: 'test'
222 }
223 })
224 expect(pool1.opts.workerOptions).toStrictEqual({
225 env: { TEST: 'test' },
226 name: 'test'
227 })
228 await pool1.destroy()
229 })
230
231 it('Should work even without opts in input', async () => {
232 const pool1 = new FixedThreadPool(
233 numberOfThreads,
234 './tests/worker-files/thread/testWorker.js'
235 )
236 const res = await pool1.execute()
237 expect(res).toStrictEqual({ ok: 1 })
238 // We need to clean up the resources after our test
239 await pool1.destroy()
240 })
241
242 it('Verify that a pool with zero worker fails', async () => {
243 expect(
244 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
245 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
246 })
247 })