1 import { expect } from 'expect'
2 import { FixedThreadPool, PoolEvents } from '../../../lib/index.js'
3 import { TaskFunctions } from '../../test-types.js'
4 import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.js'
5 import { DEFAULT_TASK_NAME } from '../../../lib/utils.js'
7 describe('Fixed thread pool test suite', () => {
8 const numberOfThreads = 6
9 const tasksConcurrency = 2
10 const pool = new FixedThreadPool(
12 './tests/worker-files/thread/testWorker.mjs',
14 errorHandler: e => console.error(e)
17 const queuePool = new FixedThreadPool(
19 './tests/worker-files/thread/testWorker.mjs',
21 enableTasksQueue: true,
23 concurrency: tasksConcurrency
25 errorHandler: e => console.error(e)
28 const emptyPool = new FixedThreadPool(
30 './tests/worker-files/thread/emptyWorker.mjs',
31 { exitHandler: () => console.info('empty pool worker exited') }
33 const echoPool = new FixedThreadPool(
35 './tests/worker-files/thread/echoWorker.mjs'
37 const errorPool = new FixedThreadPool(
39 './tests/worker-files/thread/errorWorker.mjs',
41 errorHandler: e => console.error(e)
44 const asyncErrorPool = new FixedThreadPool(
46 './tests/worker-files/thread/asyncErrorWorker.mjs',
48 errorHandler: e => console.error(e)
51 const asyncPool = new FixedThreadPool(
53 './tests/worker-files/thread/asyncWorker.mjs'
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool.destroy()
59 await asyncPool.destroy()
60 await errorPool.destroy()
61 await asyncErrorPool.destroy()
62 await emptyPool.destroy()
63 await queuePool.destroy()
66 it('Verify that the function is executed in a worker thread', async () => {
67 let result = await pool.execute({
68 function: TaskFunctions.fibonacci
70 expect(result).toBe(75025)
71 result = await pool.execute({
72 function: TaskFunctions.factorial
74 expect(result).toBe(9.33262154439441e157)
77 it('Verify that is possible to invoke the execute() method without input', async () => {
78 const result = await pool.execute()
79 expect(result).toStrictEqual({ ok: 1 })
82 it("Verify that 'ready' event is emitted", async () => {
83 const pool = new FixedThreadPool(
85 './tests/worker-files/thread/testWorker.mjs',
87 errorHandler: e => console.error(e)
90 expect(pool.emitter.eventNames()).toStrictEqual([])
92 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
93 await waitPoolEvents(pool, PoolEvents.ready, 1)
94 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
95 expect(poolReady).toBe(1)
99 it("Verify that 'busy' event is emitted", async () => {
100 const promises = new Set()
101 expect(pool.emitter.eventNames()).toStrictEqual([])
103 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
104 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
105 for (let i = 0; i < numberOfThreads * 2; i++) {
106 promises.add(pool.execute())
108 await Promise.all(promises)
109 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
110 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
111 expect(poolBusy).toBe(numberOfThreads + 1)
114 it('Verify that tasks queuing is working', async () => {
115 const promises = new Set()
116 const maxMultiplier = 3 // Must be greater than tasksConcurrency
117 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
118 promises.add(queuePool.execute())
120 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
121 for (const workerNode of queuePool.workerNodes) {
122 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
123 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
124 queuePool.opts.tasksQueueOptions.concurrency
126 expect(workerNode.usage.tasks.executed).toBe(0)
127 expect(workerNode.usage.tasks.queued).toBe(
128 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
130 expect(workerNode.usage.tasks.maxQueued).toBe(
131 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
133 expect(workerNode.usage.tasks.stolen).toBe(0)
135 expect(queuePool.info.executedTasks).toBe(0)
136 expect(queuePool.info.executingTasks).toBe(
137 numberOfThreads * queuePool.opts.tasksQueueOptions.concurrency
139 expect(queuePool.info.queuedTasks).toBe(
141 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
143 expect(queuePool.info.maxQueuedTasks).toBe(
145 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
147 expect(queuePool.info.backPressure).toBe(false)
148 expect(queuePool.info.stolenTasks).toBe(0)
149 await Promise.all(promises)
150 for (const workerNode of queuePool.workerNodes) {
151 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
152 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
153 numberOfThreads * maxMultiplier
155 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
156 expect(workerNode.usage.tasks.queued).toBe(0)
157 expect(workerNode.usage.tasks.maxQueued).toBe(
158 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
160 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
161 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
162 numberOfThreads * maxMultiplier
165 expect(queuePool.info.executedTasks).toBe(numberOfThreads * maxMultiplier)
166 expect(queuePool.info.backPressure).toBe(false)
167 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
168 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
169 numberOfThreads * maxMultiplier
173 it('Verify that is possible to have a worker that return undefined', async () => {
174 const result = await emptyPool.execute()
175 expect(result).toBeUndefined()
178 it('Verify that data are sent to the worker correctly', async () => {
179 const data = { f: 10 }
180 const result = await echoPool.execute(data)
181 expect(result).toStrictEqual(data)
184 it('Verify that transferable objects are sent to the worker correctly', async () => {
188 result = await pool.execute(undefined, undefined, [
190 new MessageChannel().port1
195 expect(result).toStrictEqual({ ok: 1 })
196 expect(error).toBeUndefined()
198 result = await pool.execute(undefined, undefined, [
199 new SharedArrayBuffer(16)
204 expect(result).toStrictEqual({ ok: 1 })
205 expect(error).toStrictEqual(
206 new TypeError('Found invalid object in transferList')
210 it('Verify that error handling is working properly:sync', async () => {
211 const data = { f: 10 }
212 expect(errorPool.emitter.eventNames()).toStrictEqual([])
214 errorPool.emitter.on(PoolEvents.taskError, e => {
217 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
220 await errorPool.execute(data)
224 expect(inError).toBeDefined()
225 expect(inError).toBeInstanceOf(Error)
226 expect(inError.message).toBeDefined()
227 expect(typeof inError.message === 'string').toBe(true)
228 expect(inError.message).toBe('Error Message from ThreadWorker')
229 expect(taskError).toStrictEqual({
230 name: DEFAULT_TASK_NAME,
231 message: new Error('Error Message from ThreadWorker'),
235 errorPool.workerNodes.some(
236 workerNode => workerNode.usage.tasks.failed === 1
241 it('Verify that error handling is working properly:async', async () => {
242 const data = { f: 10 }
243 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
245 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
248 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
253 await asyncErrorPool.execute(data)
257 expect(inError).toBeDefined()
258 expect(inError).toBeInstanceOf(Error)
259 expect(inError.message).toBeDefined()
260 expect(typeof inError.message === 'string').toBe(true)
261 expect(inError.message).toBe('Error Message from ThreadWorker:async')
262 expect(taskError).toStrictEqual({
263 name: DEFAULT_TASK_NAME,
264 message: new Error('Error Message from ThreadWorker:async'),
268 asyncErrorPool.workerNodes.some(
269 workerNode => workerNode.usage.tasks.failed === 1
274 it('Verify that async function is working properly', async () => {
275 const data = { f: 10 }
276 const startTime = performance.now()
277 const result = await asyncPool.execute(data)
278 const usedTime = performance.now() - startTime
279 expect(result).toStrictEqual(data)
280 expect(usedTime).toBeGreaterThanOrEqual(2000)
283 it('Shutdown test', async () => {
284 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
285 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
287 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
288 expect(pool.emitter.eventNames()).toStrictEqual([
293 const numberOfExitEvents = await exitPromise
294 expect(pool.started).toBe(false)
295 expect(pool.workerNodes.length).toBe(0)
296 expect(numberOfExitEvents).toBe(numberOfThreads)
297 expect(poolDestroy).toBe(1)
300 it('Verify that thread pool options are checked', async () => {
301 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
302 let pool = new FixedThreadPool(numberOfThreads, workerFilePath)
303 expect(pool.opts.workerOptions).toBeUndefined()
305 pool = new FixedThreadPool(numberOfThreads, workerFilePath, {
307 env: { TEST: 'test' },
311 expect(pool.opts.workerOptions).toStrictEqual({
312 env: { TEST: 'test' },
318 it('Should work even without opts in input', async () => {
319 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
320 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
321 const res = await pool.execute()
322 expect(res).toStrictEqual({ ok: 1 })
323 // We need to clean up the resources after our test
327 it('Verify destroyWorkerNode()', async () => {
328 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
329 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
330 const workerNodeKey = 0
332 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
335 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
336 expect(exitEvent).toBe(1)
337 expect(pool.workerNodes.length).toBe(numberOfThreads - 1)
341 it('Verify that a pool with zero worker fails', () => {
343 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.mjs')
344 ).toThrowError('Cannot instantiate a fixed pool with zero worker')