1 const { expect
} = require('expect')
2 const { FixedThreadPool
, PoolEvents
} = require('../../../lib')
3 const { TaskFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed thread pool test suite', () => {
7 const numberOfThreads
= 6
8 const tasksConcurrency
= 2
9 const pool
= new FixedThreadPool(
11 './tests/worker-files/thread/testWorker.js',
13 errorHandler
: (e
) => console
.error(e
)
16 const queuePool
= new FixedThreadPool(
18 './tests/worker-files/thread/testWorker.js',
20 enableTasksQueue
: true,
22 concurrency
: tasksConcurrency
24 errorHandler
: (e
) => console
.error(e
)
27 const emptyPool
= new FixedThreadPool(
29 './tests/worker-files/thread/emptyWorker.js',
30 { exitHandler
: () => console
.info('empty pool worker exited') }
32 const echoPool
= new FixedThreadPool(
34 './tests/worker-files/thread/echoWorker.js'
36 const errorPool
= new FixedThreadPool(
38 './tests/worker-files/thread/errorWorker.js',
40 errorHandler
: (e
) => console
.error(e
)
43 const asyncErrorPool
= new FixedThreadPool(
45 './tests/worker-files/thread/asyncErrorWorker.js',
47 errorHandler
: (e
) => console
.error(e
)
50 const asyncPool
= new FixedThreadPool(
52 './tests/worker-files/thread/asyncWorker.js'
55 after('Destroy all pools', async () => {
56 // We need to clean up the resources after our test
57 await echoPool
.destroy()
58 await asyncPool
.destroy()
59 await errorPool
.destroy()
60 await asyncErrorPool
.destroy()
61 await emptyPool
.destroy()
62 await queuePool
.destroy()
65 it('Verify that the function is executed in a worker thread', async () => {
66 let result
= await pool
.execute({
67 function: TaskFunctions
.fibonacci
69 expect(result
).toBe(75025)
70 result
= await pool
.execute({
71 function: TaskFunctions
.factorial
73 expect(result
).toBe(9.33262154439441e157
)
76 it('Verify that is possible to invoke the execute() method without input', async () => {
77 const result
= await pool
.execute()
78 expect(result
).toStrictEqual({ ok
: 1 })
81 it("Verify that 'ready' event is emitted", async () => {
82 const pool1
= new FixedThreadPool(
84 './tests/worker-files/thread/testWorker.js',
86 errorHandler
: (e
) => console
.error(e
)
90 pool1
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
91 await
waitPoolEvents(pool1
, PoolEvents
.ready
, 1)
92 expect(poolReady
).toBe(1)
95 it("Verify that 'busy' event is emitted", async () => {
96 const promises
= new Set()
98 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
99 for (let i
= 0; i
< numberOfThreads
* 2; i
++) {
100 promises
.add(pool
.execute())
102 await Promise
.all(promises
)
103 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
104 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
105 expect(poolBusy
).toBe(numberOfThreads
+ 1)
108 it('Verify that tasks queuing is working', async () => {
109 const promises
= new Set()
110 const maxMultiplier
= 3 // Must be greater than tasksConcurrency
111 for (let i
= 0; i
< numberOfThreads
* maxMultiplier
; i
++) {
112 promises
.add(queuePool
.execute())
114 expect(promises
.size
).toBe(numberOfThreads
* maxMultiplier
)
115 for (const workerNode
of queuePool
.workerNodes
) {
116 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
117 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
118 queuePool
.opts
.tasksQueueOptions
.concurrency
120 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
121 expect(workerNode
.usage
.tasks
.queued
).toBe(
122 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
124 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
125 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
128 expect(queuePool
.info
.executingTasks
).toBe(
129 numberOfThreads
* queuePool
.opts
.tasksQueueOptions
.concurrency
131 expect(queuePool
.info
.queuedTasks
).toBe(
133 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
135 expect(queuePool
.info
.maxQueuedTasks
).toBe(
137 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
139 expect(queuePool
.info
.backPressure
).toBe(false)
140 await Promise
.all(promises
)
141 for (const workerNode
of queuePool
.workerNodes
) {
142 expect(workerNode
.usage
.tasks
.executing
).toBe(0)
143 expect(workerNode
.usage
.tasks
.executed
).toBe(maxMultiplier
)
144 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
145 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
146 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
151 it('Verify that is possible to have a worker that return undefined', async () => {
152 const result
= await emptyPool
.execute()
153 expect(result
).toBeUndefined()
156 it('Verify that data are sent to the worker correctly', async () => {
157 const data
= { f
: 10 }
158 const result
= await echoPool
.execute(data
)
159 expect(result
).toStrictEqual(data
)
162 it('Verify that transferable objects are sent to the worker correctly', async () => {
166 result
= await pool
.execute(undefined, undefined, [
168 new MessageChannel().port1
173 expect(result
).toStrictEqual({ ok
: 1 })
174 expect(error
).toBeUndefined()
176 result
= await pool
.execute(undefined, undefined, [
177 new SharedArrayBuffer(16)
182 expect(result
).toStrictEqual({ ok
: 1 })
183 expect(error
).toStrictEqual(
184 new TypeError('Found invalid object in transferList')
188 it('Verify that error handling is working properly:sync', async () => {
189 const data
= { f
: 10 }
191 errorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
196 await errorPool
.execute(data
)
200 expect(inError
).toBeDefined()
201 expect(inError
).toBeInstanceOf(Error
)
202 expect(inError
.message
).toBeDefined()
203 expect(typeof inError
.message
=== 'string').toBe(true)
204 expect(inError
.message
).toBe('Error Message from ThreadWorker')
205 expect(taskError
).toStrictEqual({
207 message
: new Error('Error Message from ThreadWorker'),
211 errorPool
.workerNodes
.some(
212 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
217 it('Verify that error handling is working properly:async', async () => {
218 const data
= { f
: 10 }
220 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
225 await asyncErrorPool
.execute(data
)
229 expect(inError
).toBeDefined()
230 expect(inError
).toBeInstanceOf(Error
)
231 expect(inError
.message
).toBeDefined()
232 expect(typeof inError
.message
=== 'string').toBe(true)
233 expect(inError
.message
).toBe('Error Message from ThreadWorker:async')
234 expect(taskError
).toStrictEqual({
236 message
: new Error('Error Message from ThreadWorker:async'),
240 asyncErrorPool
.workerNodes
.some(
241 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
246 it('Verify that async function is working properly', async () => {
247 const data
= { f
: 10 }
248 const startTime
= performance
.now()
249 const result
= await asyncPool
.execute(data
)
250 const usedTime
= performance
.now() - startTime
251 expect(result
).toStrictEqual(data
)
252 expect(usedTime
).toBeGreaterThanOrEqual(2000)
255 it('Shutdown test', async () => {
256 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfThreads
)
258 pool
.emitter
.on(PoolEvents
.destroy
, () => ++poolDestroy
)
260 const numberOfExitEvents
= await exitPromise
261 expect(numberOfExitEvents
).toBe(numberOfThreads
)
262 expect(poolDestroy
).toBe(1)
265 it('Verify that thread pool options are checked', async () => {
266 const workerFilePath
= './tests/worker-files/thread/testWorker.js'
267 let pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
)
268 expect(pool1
.opts
.workerOptions
).toBeUndefined()
269 await pool1
.destroy()
270 pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
, {
272 env
: { TEST
: 'test' },
276 expect(pool1
.opts
.workerOptions
).toStrictEqual({
277 env
: { TEST
: 'test' },
280 await pool1
.destroy()
283 it('Should work even without opts in input', async () => {
284 const pool1
= new FixedThreadPool(
286 './tests/worker-files/thread/testWorker.js'
288 const res
= await pool1
.execute()
289 expect(res
).toStrictEqual({ ok
: 1 })
290 // We need to clean up the resources after our test
291 await pool1
.destroy()
294 it('Verify that a pool with zero worker fails', async () => {
296 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
297 ).toThrowError('Cannot instantiate a fixed pool with zero worker')