From 2a69b8c50a10229de607013f867d685ee594dba9 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Wed, 12 Jul 2023 12:05:56 +0200 Subject: [PATCH] fix: fix worker task functions handling MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 5 + src/pools/abstract-pool.ts | 8 +- src/worker/abstract-worker.ts | 25 +++-- tests/test-utils.js | 2 +- tests/worker/abstract-worker.test.js | 134 ++++++++++++++++++++++++++- 5 files changed, 155 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 42e60206..ab698d75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [2.6.15] - 2023-07-11 +### Fixed + +- Fix pool starting semantic. +- Fix worker task functions handling. + ### Added - Take into account worker node readiness in worker choice strategies. diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 3728b619..7d3be8d6 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -939,9 +939,7 @@ export abstract class AbstractPool< const workerNodeKey = this.getWorkerNodeKey(worker) const workerInfo = this.getWorkerInfo(workerNodeKey) workerInfo.ready = false - if (this.emitter != null) { - this.emitter.emit(PoolEvents.error, error) - } + this.emitter?.emit(PoolEvents.error, error) if (this.opts.restartWorkerOnError === true && !this.starting) { if (workerInfo.dynamic) { this.createAndSetupDynamicWorker() @@ -1096,9 +1094,7 @@ export abstract class AbstractPool< const promiseResponse = this.promiseResponseMap.get(message.id as string) if (promiseResponse != null) { if (message.taskError != null) { - if (this.emitter != null) { - this.emitter.emit(PoolEvents.taskError, message.taskError) - } + this.emitter?.emit(PoolEvents.taskError, message.taskError) promiseResponse.reject(message.taskError.message) } else { promiseResponse.resolve(message.data as Response) diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 43cc4299..1b8a673c 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -119,7 +119,15 @@ export abstract class AbstractWorker< } this.taskFunctions = new Map>() if (typeof taskFunctions === 'function') { - this.taskFunctions.set(DEFAULT_TASK_NAME, taskFunctions.bind(this)) + const boundFn = taskFunctions.bind(this) + this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn) + this.taskFunctions.set( + typeof taskFunctions.name === 'string' && + taskFunctions.name.trim().length > 0 + ? taskFunctions.name + : 'fn1', + boundFn + ) } else if (isPlainObject(taskFunctions)) { let firstEntry = true for (const [name, fn] of Object.entries(taskFunctions)) { @@ -133,9 +141,10 @@ export abstract class AbstractWorker< 'A taskFunctions parameter object value is not a function' ) } - this.taskFunctions.set(name, fn.bind(this)) + const boundFn = fn.bind(this) + this.taskFunctions.set(name, boundFn) if (firstEntry) { - this.taskFunctions.set(DEFAULT_TASK_NAME, fn.bind(this)) + this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn) firstEntry = false } } @@ -189,14 +198,15 @@ export abstract class AbstractWorker< if (typeof fn !== 'function') { throw new TypeError('fn parameter is not a function') } + const boundFn = fn.bind(this) try { if ( this.taskFunctions.get(name) === this.taskFunctions.get(DEFAULT_TASK_NAME) ) { - this.taskFunctions.set(DEFAULT_TASK_NAME, fn.bind(this)) + this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn) } - this.taskFunctions.set(name, fn.bind(this)) + this.taskFunctions.set(name, boundFn) return true } catch { return false @@ -257,10 +267,7 @@ export abstract class AbstractWorker< try { this.taskFunctions.set( DEFAULT_TASK_NAME, - this.taskFunctions.get(name)?.bind(this) as WorkerFunction< - Data, - Response - > + this.taskFunctions.get(name) as WorkerFunction ) return true } catch { diff --git a/tests/test-utils.js b/tests/test-utils.js index 1fd59942..177a5c28 100644 --- a/tests/test-utils.js +++ b/tests/test-utils.js @@ -23,7 +23,7 @@ const waitPoolEvents = async (pool, poolEvent, numberOfEventsToWait) => { if (numberOfEventsToWait === 0) { resolve(events) } - pool.emitter.on(poolEvent, () => { + pool?.emitter.on(poolEvent, () => { ++events if (events === numberOfEventsToWait) { resolve(events) diff --git a/tests/worker/abstract-worker.test.js b/tests/worker/abstract-worker.test.js index bf3dc28e..429fff7d 100644 --- a/tests/worker/abstract-worker.test.js +++ b/tests/worker/abstract-worker.test.js @@ -82,6 +82,16 @@ describe('Abstract worker test suite', () => { ) }) + it('Verify that taskFunctions parameter with unique function is taken', () => { + const worker = new ThreadWorker(() => {}) + expect(worker.taskFunctions.get('default')).toBeInstanceOf(Function) + expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function) + expect(worker.taskFunctions.size).toBe(2) + expect(worker.taskFunctions.get('default')).toStrictEqual( + worker.taskFunctions.get('fn1') + ) + }) + it('Verify that taskFunctions parameter with multiple task functions contains function', () => { const fn1 = () => { return 1 @@ -100,9 +110,13 @@ describe('Abstract worker test suite', () => { return 2 } const worker = new ClusterWorker({ fn1, fn2 }) - expect(typeof worker.taskFunctions.get('default') === 'function').toBe(true) - expect(typeof worker.taskFunctions.get('fn1') === 'function').toBe(true) - expect(typeof worker.taskFunctions.get('fn2') === 'function').toBe(true) + expect(worker.taskFunctions.get('default')).toBeInstanceOf(Function) + expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function) + expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function) + expect(worker.taskFunctions.size).toBe(3) + expect(worker.taskFunctions.get('default')).toStrictEqual( + worker.taskFunctions.get('fn1') + ) }) it('Verify that handleError() method works properly', () => { @@ -119,4 +133,118 @@ describe('Abstract worker test suite', () => { new StubWorkerWithMainWorker(() => {}).getMainWorker() ).toThrowError('Main worker not set') }) + + it('Verify that hasTaskFunction() works', () => { + const fn1 = () => { + return 1 + } + const fn2 = () => { + return 2 + } + const worker = new ClusterWorker({ fn1, fn2 }) + expect(worker.hasTaskFunction('default')).toBe(true) + expect(worker.hasTaskFunction('fn1')).toBe(true) + expect(worker.hasTaskFunction('fn2')).toBe(true) + expect(worker.hasTaskFunction('fn3')).toBe(false) + }) + + it('Verify that addTaskFunction() works', () => { + const fn1 = () => { + return 1 + } + const fn2 = () => { + return 2 + } + const fn1Replacement = () => { + return 3 + } + const worker = new ThreadWorker(fn1) + expect(worker.taskFunctions.get('default')).toBeInstanceOf(Function) + expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function) + expect(worker.taskFunctions.size).toBe(2) + expect(worker.taskFunctions.get('default')).toStrictEqual( + worker.taskFunctions.get('fn1') + ) + expect(() => worker.addTaskFunction('default', fn2)).toThrowError( + new Error('Cannot add a task function with the default reserved name') + ) + worker.addTaskFunction('fn2', fn2) + expect(worker.taskFunctions.get('default')).toBeInstanceOf(Function) + expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function) + expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function) + expect(worker.taskFunctions.size).toBe(3) + expect(worker.taskFunctions.get('default')).toStrictEqual( + worker.taskFunctions.get('fn1') + ) + worker.addTaskFunction('fn1', fn1Replacement) + expect(worker.taskFunctions.get('default')).toBeInstanceOf(Function) + expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function) + expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function) + expect(worker.taskFunctions.size).toBe(3) + expect(worker.taskFunctions.get('default')).toStrictEqual( + worker.taskFunctions.get('fn1') + ) + }) + + it('Verify that removeTaskFunction() works', () => { + const fn1 = () => { + return 1 + } + const fn2 = () => { + return 2 + } + const worker = new ThreadWorker({ fn1, fn2 }) + expect(worker.taskFunctions.get('default')).toBeInstanceOf(Function) + expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function) + expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function) + expect(worker.taskFunctions.size).toBe(3) + expect(worker.taskFunctions.get('default')).toStrictEqual( + worker.taskFunctions.get('fn1') + ) + expect(() => worker.removeTaskFunction('default')).toThrowError( + new Error( + 'Cannot remove the task function with the default reserved name' + ) + ) + expect(() => worker.removeTaskFunction('fn1')).toThrowError( + new Error( + 'Cannot remove the task function used as the default task function' + ) + ) + worker.removeTaskFunction('fn2') + expect(worker.taskFunctions.get('default')).toBeInstanceOf(Function) + expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function) + expect(worker.taskFunctions.get('fn2')).toBeUndefined() + expect(worker.taskFunctions.size).toBe(2) + }) + + it('Verify that setDefaultTaskFunction() works', () => { + const fn1 = () => { + return 1 + } + const fn2 = () => { + return 2 + } + const worker = new ThreadWorker({ fn1, fn2 }) + expect(worker.taskFunctions.get('default')).toBeInstanceOf(Function) + expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function) + expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function) + expect(worker.taskFunctions.size).toBe(3) + expect(worker.taskFunctions.get('default')).toStrictEqual( + worker.taskFunctions.get('fn1') + ) + expect(() => worker.setDefaultTaskFunction('default')).toThrowError( + new Error( + 'Cannot set the default task function reserved name as the default task function' + ) + ) + worker.setDefaultTaskFunction('fn1') + expect(worker.taskFunctions.get('default')).toStrictEqual( + worker.taskFunctions.get('fn1') + ) + worker.setDefaultTaskFunction('fn2') + expect(worker.taskFunctions.get('default')).toStrictEqual( + worker.taskFunctions.get('fn2') + ) + }) }) -- 2.34.1