fix: fix continuous tasks stealing on idle start at worker node idling
[poolifier.git] / tests / pools / thread / fixed.test.mjs
1 import { expect } from 'expect'
2 import { FixedThreadPool, PoolEvents } from '../../../lib/index.js'
3 import { TaskFunctions } from '../../test-types.js'
4 import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.js'
5 import { DEFAULT_TASK_NAME } from '../../../lib/utils.js'
6
7 describe('Fixed thread pool test suite', () => {
8 const numberOfThreads = 6
9 const tasksConcurrency = 2
10 const pool = new FixedThreadPool(
11 numberOfThreads,
12 './tests/worker-files/thread/testWorker.mjs',
13 {
14 errorHandler: e => console.error(e)
15 }
16 )
17 const queuePool = new FixedThreadPool(
18 numberOfThreads,
19 './tests/worker-files/thread/testWorker.mjs',
20 {
21 enableTasksQueue: true,
22 tasksQueueOptions: {
23 concurrency: tasksConcurrency
24 },
25 errorHandler: e => console.error(e)
26 }
27 )
28 const emptyPool = new FixedThreadPool(
29 numberOfThreads,
30 './tests/worker-files/thread/emptyWorker.mjs',
31 { exitHandler: () => console.info('empty pool worker exited') }
32 )
33 const echoPool = new FixedThreadPool(
34 numberOfThreads,
35 './tests/worker-files/thread/echoWorker.mjs'
36 )
37 const errorPool = new FixedThreadPool(
38 numberOfThreads,
39 './tests/worker-files/thread/errorWorker.mjs',
40 {
41 errorHandler: e => console.error(e)
42 }
43 )
44 const asyncErrorPool = new FixedThreadPool(
45 numberOfThreads,
46 './tests/worker-files/thread/asyncErrorWorker.mjs',
47 {
48 errorHandler: e => console.error(e)
49 }
50 )
51 const asyncPool = new FixedThreadPool(
52 numberOfThreads,
53 './tests/worker-files/thread/asyncWorker.mjs'
54 )
55
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool.destroy()
59 await asyncPool.destroy()
60 await errorPool.destroy()
61 await asyncErrorPool.destroy()
62 await emptyPool.destroy()
63 await queuePool.destroy()
64 })
65
66 it('Verify that the function is executed in a worker thread', async () => {
67 let result = await pool.execute({
68 function: TaskFunctions.fibonacci
69 })
70 expect(result).toBe(75025)
71 result = await pool.execute({
72 function: TaskFunctions.factorial
73 })
74 expect(result).toBe(9.33262154439441e157)
75 })
76
77 it('Verify that is possible to invoke the execute() method without input', async () => {
78 const result = await pool.execute()
79 expect(result).toStrictEqual({ ok: 1 })
80 })
81
82 it("Verify that 'ready' event is emitted", async () => {
83 const pool = new FixedThreadPool(
84 numberOfThreads,
85 './tests/worker-files/thread/testWorker.mjs',
86 {
87 errorHandler: e => console.error(e)
88 }
89 )
90 expect(pool.emitter.eventNames()).toStrictEqual([])
91 let poolReady = 0
92 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
93 await waitPoolEvents(pool, PoolEvents.ready, 1)
94 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
95 expect(poolReady).toBe(1)
96 await pool.destroy()
97 })
98
99 it("Verify that 'busy' event is emitted", async () => {
100 const promises = new Set()
101 expect(pool.emitter.eventNames()).toStrictEqual([])
102 let poolBusy = 0
103 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
104 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
105 for (let i = 0; i < numberOfThreads * 2; i++) {
106 promises.add(pool.execute())
107 }
108 await Promise.all(promises)
109 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
110 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
111 expect(poolBusy).toBe(numberOfThreads + 1)
112 })
113
114 it('Verify that tasks queuing is working', async () => {
115 const promises = new Set()
116 const maxMultiplier = 3 // Must be greater than tasksConcurrency
117 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
118 promises.add(queuePool.execute())
119 }
120 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
121 for (const workerNode of queuePool.workerNodes) {
122 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
123 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
124 queuePool.opts.tasksQueueOptions.concurrency
125 )
126 expect(workerNode.usage.tasks.executed).toBe(0)
127 expect(workerNode.usage.tasks.queued).toBe(
128 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
129 )
130 expect(workerNode.usage.tasks.maxQueued).toBe(
131 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
132 )
133 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
134 expect(workerNode.usage.tasks.stolen).toBe(0)
135 }
136 expect(queuePool.info.executedTasks).toBe(0)
137 expect(queuePool.info.executingTasks).toBe(
138 numberOfThreads * queuePool.opts.tasksQueueOptions.concurrency
139 )
140 expect(queuePool.info.queuedTasks).toBe(
141 numberOfThreads *
142 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
143 )
144 expect(queuePool.info.maxQueuedTasks).toBe(
145 numberOfThreads *
146 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
147 )
148 expect(queuePool.info.backPressure).toBe(false)
149 expect(queuePool.info.stolenTasks).toBe(0)
150 await Promise.all(promises)
151 for (const workerNode of queuePool.workerNodes) {
152 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
153 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
154 numberOfThreads * maxMultiplier
155 )
156 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
157 expect(workerNode.usage.tasks.queued).toBe(0)
158 expect(workerNode.usage.tasks.maxQueued).toBe(
159 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
160 )
161 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
162 0
163 )
164 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
165 numberOfThreads * maxMultiplier
166 )
167 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
168 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
169 numberOfThreads * maxMultiplier
170 )
171 }
172 expect(queuePool.info.executedTasks).toBe(numberOfThreads * maxMultiplier)
173 expect(queuePool.info.backPressure).toBe(false)
174 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
175 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
176 numberOfThreads * maxMultiplier
177 )
178 })
179
180 it('Verify that is possible to have a worker that return undefined', async () => {
181 const result = await emptyPool.execute()
182 expect(result).toBeUndefined()
183 })
184
185 it('Verify that data are sent to the worker correctly', async () => {
186 const data = { f: 10 }
187 const result = await echoPool.execute(data)
188 expect(result).toStrictEqual(data)
189 })
190
191 it('Verify that transferable objects are sent to the worker correctly', async () => {
192 let error
193 let result
194 try {
195 result = await pool.execute(undefined, undefined, [
196 new ArrayBuffer(16),
197 new MessageChannel().port1
198 ])
199 } catch (e) {
200 error = e
201 }
202 expect(result).toStrictEqual({ ok: 1 })
203 expect(error).toBeUndefined()
204 try {
205 result = await pool.execute(undefined, undefined, [
206 new SharedArrayBuffer(16)
207 ])
208 } catch (e) {
209 error = e
210 }
211 expect(result).toStrictEqual({ ok: 1 })
212 expect(error).toBeInstanceOf(Error)
213 expect(error.message).toMatch(
214 /Found invalid (object|value) in transferList/
215 )
216 })
217
218 it('Verify that error handling is working properly:sync', async () => {
219 const data = { f: 10 }
220 expect(errorPool.emitter.eventNames()).toStrictEqual([])
221 let taskError
222 errorPool.emitter.on(PoolEvents.taskError, e => {
223 taskError = e
224 })
225 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
226 let inError
227 try {
228 await errorPool.execute(data)
229 } catch (e) {
230 inError = e
231 }
232 expect(inError).toBeDefined()
233 expect(inError).toBeInstanceOf(Error)
234 expect(inError.message).toBeDefined()
235 expect(typeof inError.message === 'string').toBe(true)
236 expect(inError.message).toBe('Error Message from ThreadWorker')
237 expect(taskError).toStrictEqual({
238 name: DEFAULT_TASK_NAME,
239 message: new Error('Error Message from ThreadWorker'),
240 data
241 })
242 expect(
243 errorPool.workerNodes.some(
244 workerNode => workerNode.usage.tasks.failed === 1
245 )
246 ).toBe(true)
247 })
248
249 it('Verify that error handling is working properly:async', async () => {
250 const data = { f: 10 }
251 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
252 let taskError
253 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
254 taskError = e
255 })
256 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
257 PoolEvents.taskError
258 ])
259 let inError
260 try {
261 await asyncErrorPool.execute(data)
262 } catch (e) {
263 inError = e
264 }
265 expect(inError).toBeDefined()
266 expect(inError).toBeInstanceOf(Error)
267 expect(inError.message).toBeDefined()
268 expect(typeof inError.message === 'string').toBe(true)
269 expect(inError.message).toBe('Error Message from ThreadWorker:async')
270 expect(taskError).toStrictEqual({
271 name: DEFAULT_TASK_NAME,
272 message: new Error('Error Message from ThreadWorker:async'),
273 data
274 })
275 expect(
276 asyncErrorPool.workerNodes.some(
277 workerNode => workerNode.usage.tasks.failed === 1
278 )
279 ).toBe(true)
280 })
281
282 it('Verify that async function is working properly', async () => {
283 const data = { f: 10 }
284 const startTime = performance.now()
285 const result = await asyncPool.execute(data)
286 const usedTime = performance.now() - startTime
287 expect(result).toStrictEqual(data)
288 expect(usedTime).toBeGreaterThanOrEqual(2000)
289 })
290
291 it('Shutdown test', async () => {
292 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
293 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
294 let poolDestroy = 0
295 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
296 expect(pool.emitter.eventNames()).toStrictEqual([
297 PoolEvents.busy,
298 PoolEvents.destroy
299 ])
300 await pool.destroy()
301 const numberOfExitEvents = await exitPromise
302 expect(pool.started).toBe(false)
303 expect(pool.readyEventEmitted).toBe(false)
304 expect(pool.workerNodes.length).toBe(0)
305 expect(numberOfExitEvents).toBe(numberOfThreads)
306 expect(poolDestroy).toBe(1)
307 })
308
309 it('Verify that thread pool options are checked', async () => {
310 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
311 let pool = new FixedThreadPool(numberOfThreads, workerFilePath)
312 expect(pool.opts.workerOptions).toBeUndefined()
313 await pool.destroy()
314 pool = new FixedThreadPool(numberOfThreads, workerFilePath, {
315 workerOptions: {
316 env: { TEST: 'test' },
317 name: 'test'
318 }
319 })
320 expect(pool.opts.workerOptions).toStrictEqual({
321 env: { TEST: 'test' },
322 name: 'test'
323 })
324 await pool.destroy()
325 })
326
327 it('Should work even without opts in input', async () => {
328 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
329 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
330 const res = await pool.execute()
331 expect(res).toStrictEqual({ ok: 1 })
332 // We need to clean up the resources after our test
333 await pool.destroy()
334 })
335
336 it('Verify destroyWorkerNode()', async () => {
337 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
338 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
339 const workerNodeKey = 0
340 let exitEvent = 0
341 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
342 ++exitEvent
343 })
344 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
345 expect(exitEvent).toBe(1)
346 expect(pool.workerNodes.length).toBe(numberOfThreads - 1)
347 await pool.destroy()
348 })
349
350 it('Verify that a pool with zero worker fails', () => {
351 expect(
352 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.mjs')
353 ).toThrow('Cannot instantiate a fixed pool with zero worker')
354 })
355 })