1 import { expect } from 'expect'
3 import { FixedThreadPool, PoolEvents } from '../../../lib/index.cjs'
4 import { DEFAULT_TASK_NAME } from '../../../lib/utils.cjs'
5 import { TaskFunctions } from '../../test-types.cjs'
6 import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
8 describe('Fixed thread pool test suite', () => {
9 const numberOfThreads = 6
10 const tasksConcurrency = 2
11 const pool = new FixedThreadPool(
13 './tests/worker-files/thread/testWorker.mjs',
15 errorHandler: e => console.error(e)
18 const queuePool = new FixedThreadPool(
20 './tests/worker-files/thread/testWorker.mjs',
22 enableTasksQueue: true,
24 concurrency: tasksConcurrency
26 errorHandler: e => console.error(e)
29 const emptyPool = new FixedThreadPool(
31 './tests/worker-files/thread/emptyWorker.mjs',
32 { exitHandler: () => console.info('empty pool worker exited') }
34 const echoPool = new FixedThreadPool(
36 './tests/worker-files/thread/echoWorker.mjs'
38 const errorPool = new FixedThreadPool(
40 './tests/worker-files/thread/errorWorker.mjs',
42 errorHandler: e => console.error(e)
45 const asyncErrorPool = new FixedThreadPool(
47 './tests/worker-files/thread/asyncErrorWorker.mjs',
49 errorHandler: e => console.error(e)
52 const asyncPool = new FixedThreadPool(
54 './tests/worker-files/thread/asyncWorker.mjs'
57 after('Destroy all pools', async () => {
58 // We need to clean up the resources after our test
59 await echoPool.destroy()
60 await asyncPool.destroy()
61 await errorPool.destroy()
62 await asyncErrorPool.destroy()
63 await emptyPool.destroy()
64 await queuePool.destroy()
67 it('Verify that the function is executed in a worker thread', async () => {
68 let result = await pool.execute({
69 function: TaskFunctions.fibonacci
71 expect(result).toBe(75025)
72 result = await pool.execute({
73 function: TaskFunctions.factorial
75 expect(result).toBe(9.33262154439441e157)
78 it('Verify that is possible to invoke the execute() method without input', async () => {
79 const result = await pool.execute()
80 expect(result).toStrictEqual({ ok: 1 })
83 it("Verify that 'ready' event is emitted", async () => {
84 const pool = new FixedThreadPool(
86 './tests/worker-files/thread/testWorker.mjs',
88 errorHandler: e => console.error(e)
91 expect(pool.emitter.eventNames()).toStrictEqual([])
93 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
94 await waitPoolEvents(pool, PoolEvents.ready, 1)
95 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
96 expect(poolReady).toBe(1)
100 it("Verify that 'busy' event is emitted", async () => {
101 const promises = new Set()
102 expect(pool.emitter.eventNames()).toStrictEqual([])
104 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
105 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
106 for (let i = 0; i < numberOfThreads * 2; i++) {
107 promises.add(pool.execute())
109 await Promise.all(promises)
110 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
111 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
112 expect(poolBusy).toBe(numberOfThreads + 1)
115 it('Verify that tasks queuing is working', async () => {
116 const promises = new Set()
117 const maxMultiplier = 3 // Must be greater than tasksConcurrency
118 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
119 promises.add(queuePool.execute())
121 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
122 for (const workerNode of queuePool.workerNodes) {
123 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
124 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
125 queuePool.opts.tasksQueueOptions.concurrency
127 expect(workerNode.usage.tasks.executed).toBe(0)
128 expect(workerNode.usage.tasks.queued).toBe(
129 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
131 expect(workerNode.usage.tasks.maxQueued).toBe(
132 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
134 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
135 expect(workerNode.usage.tasks.stolen).toBe(0)
137 expect(queuePool.info.executedTasks).toBe(0)
138 expect(queuePool.info.executingTasks).toBe(
139 numberOfThreads * queuePool.opts.tasksQueueOptions.concurrency
141 expect(queuePool.info.queuedTasks).toBe(
143 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
145 expect(queuePool.info.maxQueuedTasks).toBe(
147 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
149 expect(queuePool.info.backPressure).toBe(false)
150 expect(queuePool.info.stolenTasks).toBe(0)
151 await Promise.all(promises)
152 for (const workerNode of queuePool.workerNodes) {
153 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
154 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
155 numberOfThreads * maxMultiplier
157 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
158 expect(workerNode.usage.tasks.queued).toBe(0)
159 expect(workerNode.usage.tasks.maxQueued).toBe(
160 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
162 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
165 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
166 numberOfThreads * maxMultiplier
168 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
169 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
170 numberOfThreads * maxMultiplier
173 expect(queuePool.info.executedTasks).toBe(numberOfThreads * maxMultiplier)
174 expect(queuePool.info.backPressure).toBe(false)
175 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
176 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
177 numberOfThreads * maxMultiplier
181 it('Verify that is possible to have a worker that return undefined', async () => {
182 const result = await emptyPool.execute()
183 expect(result).toBeUndefined()
186 it('Verify that data are sent to the worker correctly', async () => {
187 const data = { f: 10 }
188 const result = await echoPool.execute(data)
189 expect(result).toStrictEqual(data)
192 it('Verify that transferable objects are sent to the worker correctly', async () => {
196 result = await pool.execute(undefined, undefined, [
198 new MessageChannel().port1
203 expect(result).toStrictEqual({ ok: 1 })
204 expect(error).toBeUndefined()
206 result = await pool.execute(undefined, undefined, [
207 new SharedArrayBuffer(16)
212 expect(result).toStrictEqual({ ok: 1 })
213 expect(error).toBeInstanceOf(Error)
214 expect(error.message).toMatch(
215 /Found invalid (object|value) in transferList/
219 it('Verify that error handling is working properly:sync', async () => {
220 const data = { f: 10 }
221 expect(errorPool.emitter.eventNames()).toStrictEqual([])
223 errorPool.emitter.on(PoolEvents.taskError, e => {
226 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
229 await errorPool.execute(data)
233 expect(inError).toBeDefined()
234 expect(inError).toBeInstanceOf(Error)
235 expect(inError.message).toBeDefined()
236 expect(typeof inError.message === 'string').toBe(true)
237 expect(inError.message).toBe('Error Message from ThreadWorker')
238 expect(taskError).toStrictEqual({
239 name: DEFAULT_TASK_NAME,
240 message: new Error('Error Message from ThreadWorker'),
244 errorPool.workerNodes.some(
245 workerNode => workerNode.usage.tasks.failed === 1
250 it('Verify that error handling is working properly:async', async () => {
251 const data = { f: 10 }
252 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
254 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
257 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
262 await asyncErrorPool.execute(data)
266 expect(inError).toBeDefined()
267 expect(inError).toBeInstanceOf(Error)
268 expect(inError.message).toBeDefined()
269 expect(typeof inError.message === 'string').toBe(true)
270 expect(inError.message).toBe('Error Message from ThreadWorker:async')
271 expect(taskError).toStrictEqual({
272 name: DEFAULT_TASK_NAME,
273 message: new Error('Error Message from ThreadWorker:async'),
277 asyncErrorPool.workerNodes.some(
278 workerNode => workerNode.usage.tasks.failed === 1
283 it('Verify that async function is working properly', async () => {
284 const data = { f: 10 }
285 const startTime = performance.now()
286 const result = await asyncPool.execute(data)
287 const usedTime = performance.now() - startTime
288 expect(result).toStrictEqual(data)
289 expect(usedTime).toBeGreaterThanOrEqual(2000)
292 it('Shutdown test', async () => {
293 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
294 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
296 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
297 expect(pool.emitter.eventNames()).toStrictEqual([
302 const numberOfExitEvents = await exitPromise
303 expect(pool.started).toBe(false)
304 expect(pool.emitter.eventNames()).toStrictEqual([
308 expect(pool.readyEventEmitted).toBe(false)
309 expect(pool.workerNodes.length).toBe(0)
310 expect(numberOfExitEvents).toBe(numberOfThreads)
311 expect(poolDestroy).toBe(1)
314 it('Verify that thread pool options are checked', async () => {
315 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
316 let pool = new FixedThreadPool(numberOfThreads, workerFilePath)
317 expect(pool.opts.workerOptions).toBeUndefined()
319 pool = new FixedThreadPool(numberOfThreads, workerFilePath, {
321 env: { TEST: 'test' },
325 expect(pool.opts.workerOptions).toStrictEqual({
326 env: { TEST: 'test' },
332 it('Should work even without opts in input', async () => {
333 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
334 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
335 const res = await pool.execute()
336 expect(res).toStrictEqual({ ok: 1 })
337 // We need to clean up the resources after our test
341 it('Verify destroyWorkerNode()', async () => {
342 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
343 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
344 const workerNodeKey = 0
346 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
349 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
350 expect(exitEvent).toBe(1)
351 expect(pool.workerNodes.length).toBe(numberOfThreads - 1)
355 it('Verify that a pool with zero worker fails', () => {
357 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.mjs')
358 ).toThrow('Cannot instantiate a fixed pool with zero worker')