1 const { expect
} = require('expect')
2 const { FixedThreadPool
, PoolEvents
} = require('../../../lib')
3 const { TaskFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed thread pool test suite', () => {
7 const numberOfThreads
= 6
8 const tasksConcurrency
= 2
9 const pool
= new FixedThreadPool(
11 './tests/worker-files/thread/testWorker.js',
13 errorHandler
: (e
) => console
.error(e
)
16 const queuePool
= new FixedThreadPool(
18 './tests/worker-files/thread/testWorker.js',
20 enableTasksQueue
: true,
22 concurrency
: tasksConcurrency
24 errorHandler
: (e
) => console
.error(e
)
27 const emptyPool
= new FixedThreadPool(
29 './tests/worker-files/thread/emptyWorker.js',
30 { exitHandler
: () => console
.info('empty pool worker exited') }
32 const echoPool
= new FixedThreadPool(
34 './tests/worker-files/thread/echoWorker.js'
36 const errorPool
= new FixedThreadPool(
38 './tests/worker-files/thread/errorWorker.js',
40 errorHandler
: (e
) => console
.error(e
)
43 const asyncErrorPool
= new FixedThreadPool(
45 './tests/worker-files/thread/asyncErrorWorker.js',
47 errorHandler
: (e
) => console
.error(e
)
50 const asyncPool
= new FixedThreadPool(
52 './tests/worker-files/thread/asyncWorker.js'
55 after('Destroy all pools', async () => {
56 // We need to clean up the resources after our test
57 await echoPool
.destroy()
58 await asyncPool
.destroy()
59 await errorPool
.destroy()
60 await asyncErrorPool
.destroy()
61 await emptyPool
.destroy()
62 await queuePool
.destroy()
65 it('Verify that the function is executed in a worker thread', async () => {
66 let result
= await pool
.execute({
67 function: TaskFunctions
.fibonacci
69 expect(result
).toBe(75025)
70 result
= await pool
.execute({
71 function: TaskFunctions
.factorial
73 expect(result
).toBe(9.33262154439441e157
)
76 it('Verify that is possible to invoke the execute() method without input', async () => {
77 const result
= await pool
.execute()
78 expect(result
).toStrictEqual({ ok
: 1 })
81 it("Verify that 'ready' event is emitted", async () => {
82 const pool1
= new FixedThreadPool(
84 './tests/worker-files/thread/testWorker.js',
86 errorHandler
: (e
) => console
.error(e
)
90 pool1
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
91 await
waitPoolEvents(pool1
, 'ready', 1)
92 expect(poolReady
).toBe(1)
95 it("Verify that 'busy' event is emitted", () => {
97 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
98 for (let i
= 0; i
< numberOfThreads
* 2; i
++) {
101 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
102 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
103 expect(poolBusy
).toBe(numberOfThreads
+ 1)
106 it('Verify that tasks queuing is working', async () => {
107 const promises
= new Set()
108 const maxMultiplier
= 3 // Must be greater than tasksConcurrency
109 for (let i
= 0; i
< numberOfThreads
* maxMultiplier
; i
++) {
110 promises
.add(queuePool
.execute())
112 expect(promises
.size
).toBe(numberOfThreads
* maxMultiplier
)
113 for (const workerNode
of queuePool
.workerNodes
) {
114 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
115 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
116 queuePool
.opts
.tasksQueueOptions
.concurrency
118 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
119 expect(workerNode
.usage
.tasks
.queued
).toBe(
120 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
122 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
123 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
126 expect(queuePool
.info
.executingTasks
).toBe(
127 numberOfThreads
* queuePool
.opts
.tasksQueueOptions
.concurrency
129 expect(queuePool
.info
.queuedTasks
).toBe(
131 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
133 expect(queuePool
.info
.maxQueuedTasks
).toBe(
135 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
137 await Promise
.all(promises
)
138 for (const workerNode
of queuePool
.workerNodes
) {
139 expect(workerNode
.usage
.tasks
.executing
).toBe(0)
140 expect(workerNode
.usage
.tasks
.executed
).toBe(maxMultiplier
)
141 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
142 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
143 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
148 it('Verify that is possible to have a worker that return undefined', async () => {
149 const result
= await emptyPool
.execute()
150 expect(result
).toBeUndefined()
153 it('Verify that data are sent to the worker correctly', async () => {
154 const data
= { f
: 10 }
155 const result
= await echoPool
.execute(data
)
156 expect(result
).toStrictEqual(data
)
159 it('Verify that transferable objects are sent to the worker correctly', async () => {
163 result
= await pool
.execute(undefined, undefined, [
165 new MessageChannel().port1
170 expect(result
).toStrictEqual({ ok
: 1 })
171 expect(error
).toBeUndefined()
173 result
= await pool
.execute(undefined, undefined, [
174 new SharedArrayBuffer(16)
179 expect(result
).toStrictEqual({ ok
: 1 })
180 expect(error
).toStrictEqual(
181 new TypeError('Found invalid object in transferList')
185 it('Verify that error handling is working properly:sync', async () => {
186 const data
= { f
: 10 }
188 errorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
193 await errorPool
.execute(data
)
197 expect(inError
).toBeDefined()
198 expect(inError
).toBeInstanceOf(Error
)
199 expect(inError
.message
).toBeDefined()
200 expect(typeof inError
.message
=== 'string').toBe(true)
201 expect(inError
.message
).toBe('Error Message from ThreadWorker')
202 expect(taskError
).toStrictEqual({
204 message
: new Error('Error Message from ThreadWorker'),
208 errorPool
.workerNodes
.some(
209 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
214 it('Verify that error handling is working properly:async', async () => {
215 const data
= { f
: 10 }
217 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
222 await asyncErrorPool
.execute(data
)
226 expect(inError
).toBeDefined()
227 expect(inError
).toBeInstanceOf(Error
)
228 expect(inError
.message
).toBeDefined()
229 expect(typeof inError
.message
=== 'string').toBe(true)
230 expect(inError
.message
).toBe('Error Message from ThreadWorker:async')
231 expect(taskError
).toStrictEqual({
233 message
: new Error('Error Message from ThreadWorker:async'),
237 asyncErrorPool
.workerNodes
.some(
238 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
243 it('Verify that async function is working properly', async () => {
244 const data
= { f
: 10 }
245 const startTime
= performance
.now()
246 const result
= await asyncPool
.execute(data
)
247 const usedTime
= performance
.now() - startTime
248 expect(result
).toStrictEqual(data
)
249 expect(usedTime
).toBeGreaterThanOrEqual(2000)
252 it('Shutdown test', async () => {
253 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfThreads
)
255 pool
.emitter
.on(PoolEvents
.destroy
, () => ++poolDestroy
)
257 const numberOfExitEvents
= await exitPromise
258 expect(numberOfExitEvents
).toBe(numberOfThreads
)
259 expect(poolDestroy
).toBe(1)
262 it('Verify that thread pool options are checked', async () => {
263 const workerFilePath
= './tests/worker-files/thread/testWorker.js'
264 let pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
)
265 expect(pool1
.opts
.workerOptions
).toBeUndefined()
266 await pool1
.destroy()
267 pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
, {
269 env
: { TEST
: 'test' },
273 expect(pool1
.opts
.workerOptions
).toStrictEqual({
274 env
: { TEST
: 'test' },
277 await pool1
.destroy()
280 it('Should work even without opts in input', async () => {
281 const pool1
= new FixedThreadPool(
283 './tests/worker-files/thread/testWorker.js'
285 const res
= await pool1
.execute()
286 expect(res
).toStrictEqual({ ok
: 1 })
287 // We need to clean up the resources after our test
288 await pool1
.destroy()
291 it('Verify that a pool with zero worker fails', async () => {
293 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
294 ).toThrowError('Cannot instantiate a fixed pool with zero worker')