1 const { expect
} = require('expect')
2 const { FixedThreadPool
, PoolEvents
} = require('../../../lib')
3 const { WorkerFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed thread pool test suite', () => {
7 const numberOfThreads
= 6
8 const pool
= new FixedThreadPool(
10 './tests/worker-files/thread/testWorker.js',
12 errorHandler
: e
=> console
.error(e
)
15 const queuePool
= new FixedThreadPool(
17 './tests/worker-files/thread/testWorker.js',
19 enableTasksQueue
: true,
23 errorHandler
: e
=> console
.error(e
)
26 const emptyPool
= new FixedThreadPool(
28 './tests/worker-files/thread/emptyWorker.js',
29 { exitHandler
: () => console
.log('empty pool worker exited') }
31 const echoPool
= new FixedThreadPool(
33 './tests/worker-files/thread/echoWorker.js'
35 const errorPool
= new FixedThreadPool(
37 './tests/worker-files/thread/errorWorker.js',
39 errorHandler
: e
=> console
.error(e
)
42 const asyncErrorPool
= new FixedThreadPool(
44 './tests/worker-files/thread/asyncErrorWorker.js',
46 errorHandler
: e
=> console
.error(e
)
49 const asyncPool
= new FixedThreadPool(
51 './tests/worker-files/thread/asyncWorker.js'
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool
.destroy()
57 await asyncPool
.destroy()
58 await errorPool
.destroy()
59 await asyncErrorPool
.destroy()
60 await emptyPool
.destroy()
61 await queuePool
.destroy()
64 it('Verify that the function is executed in a worker thread', async () => {
65 let result
= await pool
.execute({
66 function: WorkerFunctions
.fibonacci
68 expect(result
).toBe(75025)
69 result
= await pool
.execute({
70 function: WorkerFunctions
.factorial
72 expect(result
).toBe(9.33262154439441e157
)
75 it('Verify that is possible to invoke the execute() method without input', async () => {
76 const result
= await pool
.execute()
77 expect(result
).toStrictEqual({ ok
: 1 })
80 it("Verify that 'ready' event is emitted", async () => {
81 const pool1
= new FixedThreadPool(
83 './tests/worker-files/thread/testWorker.js',
85 errorHandler
: e
=> console
.error(e
)
89 pool1
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
90 await
waitPoolEvents(pool1
, 'ready', 1)
91 expect(poolReady
).toBe(1)
94 it("Verify that 'busy' event is emitted", () => {
96 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
97 for (let i
= 0; i
< numberOfThreads
* 2; i
++) {
100 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
101 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
102 expect(poolBusy
).toBe(numberOfThreads
+ 1)
105 it('Verify that tasks queuing is working', async () => {
106 const promises
= new Set()
107 const maxMultiplier
= 2
108 for (let i
= 0; i
< numberOfThreads
* maxMultiplier
; i
++) {
109 promises
.add(queuePool
.execute())
111 expect(promises
.size
).toBe(numberOfThreads
* maxMultiplier
)
112 for (const workerNode
of queuePool
.workerNodes
) {
113 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
114 queuePool
.opts
.tasksQueueOptions
.concurrency
116 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
117 expect(workerNode
.usage
.tasks
.queued
).toBeGreaterThan(0)
118 expect(workerNode
.usage
.tasks
.maxQueued
).toBeGreaterThan(0)
120 expect(queuePool
.info
.executingTasks
).toBe(numberOfThreads
)
121 expect(queuePool
.info
.queuedTasks
).toBe(
122 numberOfThreads
* maxMultiplier
- numberOfThreads
124 expect(queuePool
.info
.maxQueuedTasks
).toBe(
125 numberOfThreads
* maxMultiplier
- numberOfThreads
127 await Promise
.all(promises
)
128 for (const workerNode
of queuePool
.workerNodes
) {
129 expect(workerNode
.usage
.tasks
.executing
).toBe(0)
130 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
131 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
132 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
133 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(1)
137 it('Verify that is possible to have a worker that return undefined', async () => {
138 const result
= await emptyPool
.execute()
139 expect(result
).toBeUndefined()
142 it('Verify that data are sent to the worker correctly', async () => {
143 const data
= { f
: 10 }
144 const result
= await echoPool
.execute(data
)
145 expect(result
).toStrictEqual(data
)
148 it('Verify that error handling is working properly:sync', async () => {
149 const data
= { f
: 10 }
151 errorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
156 await errorPool
.execute(data
)
160 expect(inError
).toBeDefined()
161 expect(inError
).toBeInstanceOf(Error
)
162 expect(inError
.message
).toBeDefined()
163 expect(typeof inError
.message
=== 'string').toBe(true)
164 expect(inError
.message
).toBe('Error Message from ThreadWorker')
165 expect(taskError
).toStrictEqual({
167 message
: new Error('Error Message from ThreadWorker'),
171 errorPool
.workerNodes
.some(
172 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
177 it('Verify that error handling is working properly:async', async () => {
178 const data
= { f
: 10 }
180 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
185 await asyncErrorPool
.execute(data
)
189 expect(inError
).toBeDefined()
190 expect(inError
).toBeInstanceOf(Error
)
191 expect(inError
.message
).toBeDefined()
192 expect(typeof inError
.message
=== 'string').toBe(true)
193 expect(inError
.message
).toBe('Error Message from ThreadWorker:async')
194 expect(taskError
).toStrictEqual({
196 message
: new Error('Error Message from ThreadWorker:async'),
200 asyncErrorPool
.workerNodes
.some(
201 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
206 it('Verify that async function is working properly', async () => {
207 const data
= { f
: 10 }
208 const startTime
= performance
.now()
209 const result
= await asyncPool
.execute(data
)
210 const usedTime
= performance
.now() - startTime
211 expect(result
).toStrictEqual(data
)
212 expect(usedTime
).toBeGreaterThanOrEqual(2000)
215 it('Shutdown test', async () => {
216 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfThreads
)
218 const numberOfExitEvents
= await exitPromise
219 expect(numberOfExitEvents
).toBe(numberOfThreads
)
222 it('Verify that thread pool options are checked', async () => {
223 const workerFilePath
= './tests/worker-files/thread/testWorker.js'
224 let pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
)
225 expect(pool1
.opts
.workerOptions
).toBeUndefined()
226 await pool1
.destroy()
227 pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
, {
229 env
: { TEST
: 'test' },
233 expect(pool1
.opts
.workerOptions
).toStrictEqual({
234 env
: { TEST
: 'test' },
237 await pool1
.destroy()
240 it('Should work even without opts in input', async () => {
241 const pool1
= new FixedThreadPool(
243 './tests/worker-files/thread/testWorker.js'
245 const res
= await pool1
.execute()
246 expect(res
).toStrictEqual({ ok
: 1 })
247 // We need to clean up the resources after our test
248 await pool1
.destroy()
251 it('Verify that a pool with zero worker fails', async () => {
253 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
254 ).toThrowError('Cannot instantiate a fixed pool with zero worker')