fix: ensure worker node destroy semantic is always the same
[poolifier.git] / tests / pools / thread / fixed.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
cdace0e5 2const { FixedThreadPool, PoolEvents } = require('../../../lib')
2d2e32c2 3const { WorkerFunctions } = require('../../test-types')
1dcbfefc 4const { waitPoolEvents, waitWorkerEvents } = require('../../test-utils')
506c2a14 5
079de991 6describe('Fixed thread pool test suite', () => {
e1ffb94f
JB
7 const numberOfThreads = 6
8 const pool = new FixedThreadPool(
9 numberOfThreads,
10 './tests/worker-files/thread/testWorker.js',
11 {
12 errorHandler: e => console.error(e)
13 }
14 )
594bfb84
JB
15 const queuePool = new FixedThreadPool(
16 numberOfThreads,
17 './tests/worker-files/thread/testWorker.js',
18 {
19 enableTasksQueue: true,
20 tasksQueueOptions: {
21 concurrency: 2
22 },
23 errorHandler: e => console.error(e)
24 }
25 )
e1ffb94f
JB
26 const emptyPool = new FixedThreadPool(
27 numberOfThreads,
28 './tests/worker-files/thread/emptyWorker.js',
29 { exitHandler: () => console.log('empty pool worker exited') }
30 )
31 const echoPool = new FixedThreadPool(
32 numberOfThreads,
33 './tests/worker-files/thread/echoWorker.js'
34 )
35 const errorPool = new FixedThreadPool(
36 numberOfThreads,
37 './tests/worker-files/thread/errorWorker.js',
38 {
39 errorHandler: e => console.error(e)
40 }
41 )
42 const asyncErrorPool = new FixedThreadPool(
43 numberOfThreads,
44 './tests/worker-files/thread/asyncErrorWorker.js',
45 {
46 errorHandler: e => console.error(e)
47 }
48 )
49 const asyncPool = new FixedThreadPool(
50 numberOfThreads,
51 './tests/worker-files/thread/asyncWorker.js'
52 )
53
0e2503fc
JB
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool.destroy()
57 await asyncPool.destroy()
58 await errorPool.destroy()
7c0ba920 59 await asyncErrorPool.destroy()
0e2503fc 60 await emptyPool.destroy()
594bfb84 61 await queuePool.destroy()
0e2503fc
JB
62 })
63
506c2a14 64 it('Verify that the function is executed in a worker thread', async () => {
6db75ad9
JB
65 let result = await pool.execute({
66 function: WorkerFunctions.fibonacci
67 })
024daf59 68 expect(result).toBe(75025)
6db75ad9
JB
69 result = await pool.execute({
70 function: WorkerFunctions.factorial
71 })
70a4f5ea 72 expect(result).toBe(9.33262154439441e157)
506c2a14 73 })
74
318d4156 75 it('Verify that is possible to invoke the execute() method without input', async () => {
106744f7 76 const result = await pool.execute()
30b963d4 77 expect(result).toStrictEqual({ ok: 1 })
106744f7 78 })
79
1dcbfefc 80 it("Verify that 'ready' event is emitted", async () => {
079de991
JB
81 const pool1 = new FixedThreadPool(
82 numberOfThreads,
83 './tests/worker-files/thread/testWorker.js',
84 {
85 errorHandler: e => console.error(e)
86 }
87 )
88 let poolReady = 0
66293e7d 89 pool1.emitter.on(PoolEvents.ready, () => ++poolReady)
1dcbfefc 90 await waitPoolEvents(pool1, 'ready', 1)
216541b6
JB
91 expect(poolReady).toBe(1)
92 })
93
1dcbfefc 94 it("Verify that 'busy' event is emitted", () => {
7c0ba920 95 let poolBusy = 0
aee46736 96 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
7c0ba920 97 for (let i = 0; i < numberOfThreads * 2; i++) {
8cbb82eb 98 pool.execute()
7c0ba920 99 }
14916bf9
JB
100 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
101 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
102 expect(poolBusy).toBe(numberOfThreads + 1)
7c0ba920
JB
103 })
104
594bfb84 105 it('Verify that tasks queuing is working', async () => {
d3d4b67d 106 const promises = new Set()
ee9f5295 107 const maxMultiplier = 2
594bfb84 108 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
d3d4b67d 109 promises.add(queuePool.execute())
594bfb84 110 }
d3d4b67d 111 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
594bfb84 112 for (const workerNode of queuePool.workerNodes) {
f59e1027 113 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
594bfb84
JB
114 queuePool.opts.tasksQueueOptions.concurrency
115 )
f59e1027
JB
116 expect(workerNode.usage.tasks.executed).toBe(0)
117 expect(workerNode.usage.tasks.queued).toBeGreaterThan(0)
118 expect(workerNode.usage.tasks.maxQueued).toBeGreaterThan(0)
594bfb84 119 }
a4e07f72 120 expect(queuePool.info.executingTasks).toBe(numberOfThreads)
6b27d407
JB
121 expect(queuePool.info.queuedTasks).toBe(
122 numberOfThreads * maxMultiplier - numberOfThreads
123 )
124 expect(queuePool.info.maxQueuedTasks).toBe(
d3d4b67d
JB
125 numberOfThreads * maxMultiplier - numberOfThreads
126 )
594bfb84
JB
127 await Promise.all(promises)
128 for (const workerNode of queuePool.workerNodes) {
f59e1027
JB
129 expect(workerNode.usage.tasks.executing).toBe(0)
130 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
131 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
132 expect(workerNode.usage.tasks.queued).toBe(0)
133 expect(workerNode.usage.tasks.maxQueued).toBe(1)
594bfb84
JB
134 }
135 })
136
106744f7 137 it('Verify that is possible to have a worker that return undefined', async () => {
138 const result = await emptyPool.execute()
6db75ad9 139 expect(result).toBeUndefined()
106744f7 140 })
141
142 it('Verify that data are sent to the worker correctly', async () => {
143 const data = { f: 10 }
144 const result = await echoPool.execute(data)
e1ffb94f 145 expect(result).toStrictEqual(data)
106744f7 146 })
147
7c0ba920 148 it('Verify that error handling is working properly:sync', async () => {
106744f7 149 const data = { f: 10 }
d46660cd
JB
150 let taskError
151 errorPool.emitter.on(PoolEvents.taskError, e => {
152 taskError = e
153 })
106744f7 154 let inError
155 try {
156 await errorPool.execute(data)
157 } catch (e) {
158 inError = e
159 }
7c0ba920
JB
160 expect(inError).toBeDefined()
161 expect(inError).toBeInstanceOf(Error)
162 expect(inError.message).toBeDefined()
8620fb25 163 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
164 expect(inError.message).toBe('Error Message from ThreadWorker')
165 expect(taskError).toStrictEqual({
ff128cc9 166 name: 'default',
985d0e79
JB
167 message: new Error('Error Message from ThreadWorker'),
168 data
169 })
18482cec
JB
170 expect(
171 errorPool.workerNodes.some(
f59e1027 172 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
173 )
174 ).toBe(true)
7c0ba920
JB
175 })
176
177 it('Verify that error handling is working properly:async', async () => {
178 const data = { f: 10 }
4f0b85b3
JB
179 let taskError
180 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
181 taskError = e
182 })
7c0ba920
JB
183 let inError
184 try {
185 await asyncErrorPool.execute(data)
186 } catch (e) {
187 inError = e
188 }
189 expect(inError).toBeDefined()
190 expect(inError).toBeInstanceOf(Error)
191 expect(inError.message).toBeDefined()
8620fb25 192 expect(typeof inError.message === 'string').toBe(true)
985d0e79
JB
193 expect(inError.message).toBe('Error Message from ThreadWorker:async')
194 expect(taskError).toStrictEqual({
ff128cc9 195 name: 'default',
985d0e79
JB
196 message: new Error('Error Message from ThreadWorker:async'),
197 data
198 })
18482cec
JB
199 expect(
200 asyncErrorPool.workerNodes.some(
f59e1027 201 workerNode => workerNode.usage.tasks.failed === 1
18482cec
JB
202 )
203 ).toBe(true)
106744f7 204 })
205
7784f548 206 it('Verify that async function is working properly', async () => {
207 const data = { f: 10 }
15e5141f 208 const startTime = performance.now()
7784f548 209 const result = await asyncPool.execute(data)
15e5141f 210 const usedTime = performance.now() - startTime
e1ffb94f 211 expect(result).toStrictEqual(data)
32d490eb 212 expect(usedTime).toBeGreaterThanOrEqual(2000)
7784f548 213 })
214
506c2a14 215 it('Shutdown test', async () => {
bac873bd 216 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
1f9a5a44 217 await pool.destroy()
bdacc2d2
JB
218 const numberOfExitEvents = await exitPromise
219 expect(numberOfExitEvents).toBe(numberOfThreads)
506c2a14 220 })
221
90082c8c 222 it('Verify that thread pool options are checked', async () => {
f59e1027 223 const workerFilePath = './tests/worker-files/thread/testWorker.js'
90082c8c
JB
224 let pool1 = new FixedThreadPool(numberOfThreads, workerFilePath)
225 expect(pool1.opts.workerOptions).toBeUndefined()
226 await pool1.destroy()
227 pool1 = new FixedThreadPool(numberOfThreads, workerFilePath, {
228 workerOptions: {
229 env: { TEST: 'test' },
230 name: 'test'
231 }
232 })
233 expect(pool1.opts.workerOptions).toStrictEqual({
234 env: { TEST: 'test' },
235 name: 'test'
236 })
237 await pool1.destroy()
238 })
239
506c2a14 240 it('Should work even without opts in input', async () => {
76b1e974 241 const pool1 = new FixedThreadPool(
e1ffb94f 242 numberOfThreads,
76b1e974
S
243 './tests/worker-files/thread/testWorker.js'
244 )
6db75ad9 245 const res = await pool1.execute()
30b963d4 246 expect(res).toStrictEqual({ ok: 1 })
0e2503fc
JB
247 // We need to clean up the resources after our test
248 await pool1.destroy()
506c2a14 249 })
8d3782fa
JB
250
251 it('Verify that a pool with zero worker fails', async () => {
252 expect(
253 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
2431bdb4 254 ).toThrowError('Cannot instantiate a fixed pool with zero worker')
8d3782fa 255 })
506c2a14 256})