feat: make IWRR strategy worker readiness aware
[poolifier.git] / tests / pools / thread / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedThreadPool, PoolEvents } = require('../../../lib')
2d2e32c2 3const { WorkerFunctions } = require('../../test-types')
216541b6 4const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
506c2a14 5
079de991 6describe('Fixed thread pool test suite', () => {
e1ffb94f
JB
7 const numberOfThreads = 6
8 const pool = new FixedThreadPool(
9 numberOfThreads,
10 './tests/worker-files/thread/testWorker.js',
11 {
12 errorHandler: e => console.error(e)
13 }
14 )
594bfb84
JB
15 const queuePool = new FixedThreadPool(
16 numberOfThreads,
17 './tests/worker-files/thread/testWorker.js',
18 {
19 enableTasksQueue: true,
20 tasksQueueOptions: {
21 concurrency: 2
22 },
23 errorHandler: e => console.error(e)
24 }
25 )
e1ffb94f
JB
26 const emptyPool = new FixedThreadPool(
27 numberOfThreads,
28 './tests/worker-files/thread/emptyWorker.js',
29 { exitHandler: () => console.log('empty pool worker exited') }
30 )
31 const echoPool = new FixedThreadPool(
32 numberOfThreads,
33 './tests/worker-files/thread/echoWorker.js'
34 )
35 const errorPool = new FixedThreadPool(
36 numberOfThreads,
37 './tests/worker-files/thread/errorWorker.js',
38 {
39 errorHandler: e => console.error(e)
40 }
41 )
42 const asyncErrorPool = new FixedThreadPool(
43 numberOfThreads,
44 './tests/worker-files/thread/asyncErrorWorker.js',
45 {
46 errorHandler: e => console.error(e)
47 }
48 )
49 const asyncPool = new FixedThreadPool(
50 numberOfThreads,
51 './tests/worker-files/thread/asyncWorker.js'
52 )
53
0e2503fc
JB
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool.destroy()
57 await asyncPool.destroy()
58 await errorPool.destroy()
7c0ba920 59 await asyncErrorPool.destroy()
0e2503fc 60 await emptyPool.destroy()
594bfb84 61 await queuePool.destroy()
0e2503fc
JB
62 })
63
506c2a14 64 it('Verify that the function is executed in a worker thread', async () => {
6db75ad9
JB
65 let result = await pool.execute({
66 function: WorkerFunctions.fibonacci
67 })
024daf59 68 expect(result).toBe(75025)
6db75ad9
JB
69 result = await pool.execute({
70 function: WorkerFunctions.factorial
71 })
70a4f5ea 72 expect(result).toBe(9.33262154439441e157)
506c2a14 73 })
74
318d4156 75 it('Verify that is possible to invoke the execute() method without input', async () => {
106744f7 76 const result = await pool.execute()
30b963d4 77 expect(result).toStrictEqual({ ok: 1 })
106744f7 78 })
79
216541b6 80 it("Verify that 'ready' event is emitted", async () => {
079de991
JB
81 const pool1 = new FixedThreadPool(
82 numberOfThreads,
83 './tests/worker-files/thread/testWorker.js',
84 {
85 errorHandler: e => console.error(e)
86 }
87 )
88 let poolReady = 0
89 pool1.emitter.on(PoolEvents.ready, () => ++poolReady)
3f7d99f5
JB
90 if (!pool1.info.ready) {
91 await waitPoolEvents(pool1, PoolEvents.ready, 1)
92 }
216541b6
JB
93 expect(poolReady).toBe(1)
94 })
95
aee46736 96 it("Verify that 'busy' event is emitted", async () => {
7c0ba920 97 let poolBusy = 0
aee46736 98 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 99 for (let i = 0; i < numberOfThreads * 2; i++) {
8cbb82eb 100 pool.execute()
7c0ba920 101 }
14916bf9
JB
102 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
103 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
104 expect(poolBusy).toBe(numberOfThreads + 1)
7c0ba920
JB
105 })
106
594bfb84 107 it('Verify that tasks queuing is working', async () => {
d3d4b67d 108 const promises = new Set()
ee9f5295 109 const maxMultiplier = 2
594bfb84 110 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
d3d4b67d 111 promises.add(queuePool.execute())
594bfb84 112 }
d3d4b67d 113 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
594bfb84 114 for (const workerNode of queuePool.workerNodes) {
f59e1027 115 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
116 queuePool.opts.tasksQueueOptions.concurrency
117 )
f59e1027
JB
118 expect(workerNode.usage.tasks.executed).toBe(0)
119 expect(workerNode.usage.tasks.queued).toBeGreaterThan(0)
120 expect(workerNode.usage.tasks.maxQueued).toBeGreaterThan(0)
594bfb84 121 }
a4e07f72 122 expect(queuePool.info.executingTasks).toBe(numberOfThreads)
6b27d407
JB
123 expect(queuePool.info.queuedTasks).toBe(
124 numberOfThreads * maxMultiplier - numberOfThreads
125 )
126 expect(queuePool.info.maxQueuedTasks).toBe(
d3d4b67d
JB
127 numberOfThreads * maxMultiplier - numberOfThreads
128 )
594bfb84
JB
129 await Promise.all(promises)
130 for (const workerNode of queuePool.workerNodes) {
f59e1027
JB
131 expect(workerNode.usage.tasks.executing).toBe(0)
132 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
133 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
134 expect(workerNode.usage.tasks.queued).toBe(0)
135 expect(workerNode.usage.tasks.maxQueued).toBe(1)
594bfb84
JB
136 }
137 })
138
106744f7 139 it('Verify that is possible to have a worker that return undefined', async () => {
140 const result = await emptyPool.execute()
6db75ad9 141 expect(result).toBeUndefined()
106744f7 142 })
143
144 it('Verify that data are sent to the worker correctly', async () => {
145 const data = { f: 10 }
146 const result = await echoPool.execute(data)
e1ffb94f 147 expect(result).toStrictEqual(data)
106744f7 148 })
149
7c0ba920 150 it('Verify that error handling is working properly:sync', async () => {
106744f7 151 const data = { f: 10 }
d46660cd
JB
152 let taskError
153 errorPool.emitter.on(PoolEvents.taskError, e => {
154 taskError = e
155 })
106744f7 156 let inError
157 try {
158 await errorPool.execute(data)
159 } catch (e) {
160 inError = e
161 }
7c0ba920
JB
162 expect(inError).toBeDefined()
163 expect(inError).toBeInstanceOf(Error)
164 expect(inError.message).toBeDefined()
8620fb25 165 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
166 expect(inError.message).toBe('Error Message from ThreadWorker')
167 expect(taskError).toStrictEqual({
ff128cc9 168 name: 'default',
985d0e79
JB
169 message: new Error('Error Message from ThreadWorker'),
170 data
171 })
18482cec
JB
172 expect(
173 errorPool.workerNodes.some(
f59e1027 174 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
175 )
176 ).toBe(true)
7c0ba920
JB
177 })
178
179 it('Verify that error handling is working properly:async', async () => {
180 const data = { f: 10 }
4f0b85b3
JB
181 let taskError
182 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
183 taskError = e
184 })
7c0ba920
JB
185 let inError
186 try {
187 await asyncErrorPool.execute(data)
188 } catch (e) {
189 inError = e
190 }
191 expect(inError).toBeDefined()
192 expect(inError).toBeInstanceOf(Error)
193 expect(inError.message).toBeDefined()
8620fb25 194 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
195 expect(inError.message).toBe('Error Message from ThreadWorker:async')
196 expect(taskError).toStrictEqual({
ff128cc9 197 name: 'default',
985d0e79
JB
198 message: new Error('Error Message from ThreadWorker:async'),
199 data
200 })
18482cec
JB
201 expect(
202 asyncErrorPool.workerNodes.some(
f59e1027 203 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
204 )
205 ).toBe(true)
106744f7 206 })
207
7784f548 208 it('Verify that async function is working properly', async () => {
209 const data = { f: 10 }
15e5141f 210 const startTime = performance.now()
7784f548 211 const result = await asyncPool.execute(data)
15e5141f 212 const usedTime = performance.now() - startTime
e1ffb94f 213 expect(result).toStrictEqual(data)
32d490eb 214 expect(usedTime).toBeGreaterThanOrEqual(2000)
7784f548 215 })
216
506c2a14 217 it('Shutdown test', async () => {
bac873bd 218 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
1f9a5a44 219 await pool.destroy()
bdacc2d2
JB
220 const numberOfExitEvents = await exitPromise
221 expect(numberOfExitEvents).toBe(numberOfThreads)
506c2a14 222 })
223
90082c8c 224 it('Verify that thread pool options are checked', async () => {
f59e1027 225 const workerFilePath = './tests/worker-files/thread/testWorker.js'
90082c8c
JB
226 let pool1 = new FixedThreadPool(numberOfThreads, workerFilePath)
227 expect(pool1.opts.workerOptions).toBeUndefined()
228 await pool1.destroy()
229 pool1 = new FixedThreadPool(numberOfThreads, workerFilePath, {
230 workerOptions: {
231 env: { TEST: 'test' },
232 name: 'test'
233 }
234 })
235 expect(pool1.opts.workerOptions).toStrictEqual({
236 env: { TEST: 'test' },
237 name: 'test'
238 })
239 await pool1.destroy()
240 })
241
506c2a14 242 it('Should work even without opts in input', async () => {
76b1e974 243 const pool1 = new FixedThreadPool(
e1ffb94f 244 numberOfThreads,
76b1e974
S
245 './tests/worker-files/thread/testWorker.js'
246 )
6db75ad9 247 const res = await pool1.execute()
30b963d4 248 expect(res).toStrictEqual({ ok: 1 })
0e2503fc
JB
249 // We need to clean up the resources after our test
250 await pool1.destroy()
506c2a14 251 })
8d3782fa
JB
252
253 it('Verify that a pool with zero worker fails', async () => {
254 expect(
255 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
2431bdb4 256 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 257 })
506c2a14 258})