aeebc4a7a623a1151f37f41aaf7f56ddcfcadbc1
1 const { expect
} = require('expect')
2 const { FixedThreadPool
, PoolEvents
} = require('../../../lib')
3 const { TaskFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed thread pool test suite', () => {
7 const numberOfThreads
= 6
8 const tasksConcurrency
= 2
9 const pool
= new FixedThreadPool(
11 './tests/worker-files/thread/testWorker.js',
13 errorHandler
: (e
) => console
.error(e
)
16 const queuePool
= new FixedThreadPool(
18 './tests/worker-files/thread/testWorker.js',
20 enableTasksQueue
: true,
22 concurrency
: tasksConcurrency
24 errorHandler
: (e
) => console
.error(e
)
27 const emptyPool
= new FixedThreadPool(
29 './tests/worker-files/thread/emptyWorker.js',
30 { exitHandler
: () => console
.info('empty pool worker exited') }
32 const echoPool
= new FixedThreadPool(
34 './tests/worker-files/thread/echoWorker.js'
36 const errorPool
= new FixedThreadPool(
38 './tests/worker-files/thread/errorWorker.js',
40 errorHandler
: (e
) => console
.error(e
)
43 const asyncErrorPool
= new FixedThreadPool(
45 './tests/worker-files/thread/asyncErrorWorker.js',
47 errorHandler
: (e
) => console
.error(e
)
50 const asyncPool
= new FixedThreadPool(
52 './tests/worker-files/thread/asyncWorker.js'
55 after('Destroy all pools', async () => {
56 // We need to clean up the resources after our test
57 await echoPool
.destroy()
58 await asyncPool
.destroy()
59 await errorPool
.destroy()
60 await asyncErrorPool
.destroy()
61 await emptyPool
.destroy()
62 await queuePool
.destroy()
65 it('Verify that the function is executed in a worker thread', async () => {
66 let result
= await pool
.execute({
67 function: TaskFunctions
.fibonacci
69 expect(result
).toBe(75025)
70 result
= await pool
.execute({
71 function: TaskFunctions
.factorial
73 expect(result
).toBe(9.33262154439441e157
)
76 it('Verify that is possible to invoke the execute() method without input', async () => {
77 const result
= await pool
.execute()
78 expect(result
).toStrictEqual({ ok
: 1 })
81 it("Verify that 'ready' event is emitted", async () => {
82 const pool1
= new FixedThreadPool(
84 './tests/worker-files/thread/testWorker.js',
86 errorHandler
: (e
) => console
.error(e
)
90 pool1
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
91 await
waitPoolEvents(pool1
, 'ready', 1)
92 expect(poolReady
).toBe(1)
95 it("Verify that 'busy' event is emitted", () => {
97 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
98 for (let i
= 0; i
< numberOfThreads
* 2; i
++) {
101 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
102 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
103 expect(poolBusy
).toBe(numberOfThreads
+ 1)
106 it('Verify that tasks queuing is working', async () => {
107 const promises
= new Set()
108 const maxMultiplier
= 3 // Must be greater than tasksConcurrency
109 for (let i
= 0; i
< numberOfThreads
* maxMultiplier
; i
++) {
110 promises
.add(queuePool
.execute())
112 expect(promises
.size
).toBe(numberOfThreads
* maxMultiplier
)
113 for (const workerNode
of queuePool
.workerNodes
) {
114 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
115 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
116 queuePool
.opts
.tasksQueueOptions
.concurrency
118 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
119 expect(workerNode
.usage
.tasks
.queued
).toBe(
120 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
122 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
123 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
126 expect(queuePool
.info
.executingTasks
).toBe(
127 numberOfThreads
* queuePool
.opts
.tasksQueueOptions
.concurrency
129 expect(queuePool
.info
.queuedTasks
).toBe(
131 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
133 expect(queuePool
.info
.maxQueuedTasks
).toBe(
135 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
137 expect(queuePool
.info
.backPressure
).toBe(false)
138 await Promise
.all(promises
)
139 for (const workerNode
of queuePool
.workerNodes
) {
140 expect(workerNode
.usage
.tasks
.executing
).toBe(0)
141 expect(workerNode
.usage
.tasks
.executed
).toBe(maxMultiplier
)
142 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
143 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
144 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
149 it('Verify that is possible to have a worker that return undefined', async () => {
150 const result
= await emptyPool
.execute()
151 expect(result
).toBeUndefined()
154 it('Verify that data are sent to the worker correctly', async () => {
155 const data
= { f
: 10 }
156 const result
= await echoPool
.execute(data
)
157 expect(result
).toStrictEqual(data
)
160 it('Verify that transferable objects are sent to the worker correctly', async () => {
164 result
= await pool
.execute(undefined, undefined, [
166 new MessageChannel().port1
171 expect(result
).toStrictEqual({ ok
: 1 })
172 expect(error
).toBeUndefined()
174 result
= await pool
.execute(undefined, undefined, [
175 new SharedArrayBuffer(16)
180 expect(result
).toStrictEqual({ ok
: 1 })
181 expect(error
).toStrictEqual(
182 new TypeError('Found invalid object in transferList')
186 it('Verify that error handling is working properly:sync', async () => {
187 const data
= { f
: 10 }
189 errorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
194 await errorPool
.execute(data
)
198 expect(inError
).toBeDefined()
199 expect(inError
).toBeInstanceOf(Error
)
200 expect(inError
.message
).toBeDefined()
201 expect(typeof inError
.message
=== 'string').toBe(true)
202 expect(inError
.message
).toBe('Error Message from ThreadWorker')
203 expect(taskError
).toStrictEqual({
205 message
: new Error('Error Message from ThreadWorker'),
209 errorPool
.workerNodes
.some(
210 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
215 it('Verify that error handling is working properly:async', async () => {
216 const data
= { f
: 10 }
218 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
223 await asyncErrorPool
.execute(data
)
227 expect(inError
).toBeDefined()
228 expect(inError
).toBeInstanceOf(Error
)
229 expect(inError
.message
).toBeDefined()
230 expect(typeof inError
.message
=== 'string').toBe(true)
231 expect(inError
.message
).toBe('Error Message from ThreadWorker:async')
232 expect(taskError
).toStrictEqual({
234 message
: new Error('Error Message from ThreadWorker:async'),
238 asyncErrorPool
.workerNodes
.some(
239 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
244 it('Verify that async function is working properly', async () => {
245 const data
= { f
: 10 }
246 const startTime
= performance
.now()
247 const result
= await asyncPool
.execute(data
)
248 const usedTime
= performance
.now() - startTime
249 expect(result
).toStrictEqual(data
)
250 expect(usedTime
).toBeGreaterThanOrEqual(2000)
253 it('Shutdown test', async () => {
254 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfThreads
)
256 pool
.emitter
.on(PoolEvents
.destroy
, () => ++poolDestroy
)
258 const numberOfExitEvents
= await exitPromise
259 expect(numberOfExitEvents
).toBe(numberOfThreads
)
260 expect(poolDestroy
).toBe(1)
263 it('Verify that thread pool options are checked', async () => {
264 const workerFilePath
= './tests/worker-files/thread/testWorker.js'
265 let pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
)
266 expect(pool1
.opts
.workerOptions
).toBeUndefined()
267 await pool1
.destroy()
268 pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
, {
270 env
: { TEST
: 'test' },
274 expect(pool1
.opts
.workerOptions
).toStrictEqual({
275 env
: { TEST
: 'test' },
278 await pool1
.destroy()
281 it('Should work even without opts in input', async () => {
282 const pool1
= new FixedThreadPool(
284 './tests/worker-files/thread/testWorker.js'
286 const res
= await pool1
.execute()
287 expect(res
).toStrictEqual({ ok
: 1 })
288 // We need to clean up the resources after our test
289 await pool1
.destroy()
292 it('Verify that a pool with zero worker fails', async () => {
294 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
295 ).toThrowError('Cannot instantiate a fixed pool with zero worker')