From: Jérôme Benoit Date: Sun, 9 Oct 2022 18:27:37 +0000 (+0200) Subject: Enhance tasks statistics X-Git-Tag: v2.3.1~56 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=bf9549aef8a23a3931e19040dadb7f1e8e6422b5;p=poolifier.git Enhance tasks statistics Close #577 Signed-off-by: Jérôme Benoit --- diff --git a/README.md b/README.md index a292ce84..2db54d26 100644 --- a/README.md +++ b/README.md @@ -99,7 +99,7 @@ You can implement a worker-threads worker in a simple way by extending the class 'use strict' const { ThreadWorker } = require('poolifier') -function yourFunction (data) { +function yourFunction(data) { // this will be executed in the worker thread, // the data will be received by using the execute method return { ok: 1 } @@ -194,7 +194,7 @@ This method will call the terminate method on each worker. The last active time of your worker unit will be updated when a task is submitted to a worker or when a worker terminate a task. If `killBehavior` is set to `KillBehaviors.HARD` this value represents also the timeout for the tasks that you submit to the pool, when this timeout expires your tasks is interrupted and the worker is killed if is not part of the minimum size of the pool. If `killBehavior` is set to `KillBehaviors.SOFT` your tasks have no timeout and your workers will not be terminated until your task is completed. - Default: 60.000 ms + Default: 60000 ms - `async` - true/false, true if your function contains async pieces else false - `killBehavior` - Dictates if your async unit (worker/process) will be deleted in case that a task is active on it. diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 10fe1281..0921f456 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -6,7 +6,7 @@ import { EMPTY_FUNCTION } from '../utils' import { isKillBehavior, KillBehaviors } from '../worker/worker-options' import type { AbstractPoolWorker } from './abstract-pool-worker' import type { PoolOptions } from './pool' -import type { IPoolInternal } from './pool-internal' +import type { IPoolInternal, TasksUsage } from './pool-internal' import { PoolEmitter, PoolType } from './pool-internal' import { WorkerChoiceStrategies, @@ -14,6 +14,9 @@ import { } from './selection-strategies/selection-strategies-types' import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context' +const WORKER_NOT_FOUND_TASKS_USAGE_MAP = + 'Worker could not be found in worker tasks usage map' + /** * Base class containing some shared logic for all poolifier pools. * @@ -29,8 +32,16 @@ export abstract class AbstractPool< /** @inheritDoc */ public readonly workers: Worker[] = [] - /** @inheritDoc */ - public readonly tasks: Map = new Map() + /** + * The workers tasks usage map. + * + * `key`: The `Worker` + * `value`: Worker tasks usage statistics. + */ + protected workersTasksUsage: Map = new Map< + Worker, + TasksUsage + >() /** @inheritDoc */ public readonly emitter?: PoolEmitter @@ -151,14 +162,19 @@ export abstract class AbstractPool< return this.promiseMap.size } + /** @inheritDoc */ + public getWorkerIndex (worker: Worker): number { + return this.workers.indexOf(worker) + } + /** @inheritDoc */ public getWorkerRunningTasks (worker: Worker): number | undefined { - return this.tasks.get(worker) + return this.workersTasksUsage.get(worker)?.running } /** @inheritDoc */ - public getWorkerIndex (worker: Worker): number { - return this.workers.indexOf(worker) + public getWorkerAverageTasksRunTime (worker: Worker): number | undefined { + return this.workersTasksUsage.get(worker)?.avgRunTime } /** @inheritDoc */ @@ -230,36 +246,29 @@ export abstract class AbstractPool< protected abstract isMain (): boolean /** - * Increase the number of tasks that the given worker has applied. + * Hook executed before the worker task promise resolution. + * Can be overridden. * - * @param worker Worker whose tasks are increased. + * @param worker The worker. */ - protected increaseWorkersTask (worker: Worker): void { - this.stepWorkerNumberOfTasks(worker, 1) + protected beforePromiseWorkerResponseHook (worker: Worker): void { + this.increaseWorkerRunningTasks(worker) } /** - * Decrease the number of tasks that the given worker has applied. + * Hook executed after the worker task promise resolution. + * Can be overridden. * - * @param worker Worker whose tasks are decreased. + * @param message The received message. + * @param promise The Promise response. */ - protected decreaseWorkersTasks (worker: Worker): void { - this.stepWorkerNumberOfTasks(worker, -1) - } - - /** - * Step the number of tasks that the given worker has applied. - * - * @param worker Worker whose tasks are set. - * @param step Worker number of tasks step. - */ - private stepWorkerNumberOfTasks (worker: Worker, step: number): void { - const numberOfTasksInProgress = this.tasks.get(worker) - if (numberOfTasksInProgress !== undefined) { - this.tasks.set(worker, numberOfTasksInProgress + step) - } else { - throw Error('Worker could not be found in tasks map') - } + protected afterPromiseWorkerResponseHook ( + message: MessageValue, + promise: PromiseWorkerResponseWrapper + ): void { + this.decreaseWorkerRunningTasks(promise.worker) + this.stepWorkerRunTasks(promise.worker, 1) + this.updateWorkerTasksRunTime(promise.worker, message.taskRunTime) } /** @@ -270,7 +279,7 @@ export abstract class AbstractPool< protected removeWorker (worker: Worker): void { // Clean worker from data structure this.workers.splice(this.getWorkerIndex(worker), 1) - this.tasks.delete(worker) + this.resetWorkerTasksUsage(worker) } /** @@ -309,7 +318,7 @@ export abstract class AbstractPool< worker: Worker, messageId: number ): Promise { - this.increaseWorkersTask(worker) + this.beforePromiseWorkerResponseHook(worker) return new Promise((resolve, reject) => { this.promiseMap.set(messageId, { resolve, reject, worker }) }) @@ -345,8 +354,13 @@ export abstract class AbstractPool< this.workers.push(worker) - // Init tasks map - this.tasks.set(worker, 0) + // Init worker tasks usage map + this.workersTasksUsage.set(worker, { + run: 0, + running: 0, + runTime: 0, + avgRunTime: 0 + }) this.afterWorkerSetup(worker) @@ -363,7 +377,7 @@ export abstract class AbstractPool< if (message.id !== undefined) { const promise = this.promiseMap.get(message.id) if (promise !== undefined) { - this.decreaseWorkersTasks(promise.worker) + this.afterPromiseWorkerResponseHook(message, promise) if (message.error) promise.reject(message.error) else promise.resolve(message.data as Response) this.promiseMap.delete(message.id) @@ -377,4 +391,83 @@ export abstract class AbstractPool< this.emitter?.emit('busy') } } + + /** + * Increase the number of tasks that the given worker has applied. + * + * @param worker Worker which running tasks is increased. + */ + private increaseWorkerRunningTasks (worker: Worker): void { + this.stepWorkerRunningTasks(worker, 1) + } + + /** + * Decrease the number of tasks that the given worker has applied. + * + * @param worker Worker which running tasks is decreased. + */ + private decreaseWorkerRunningTasks (worker: Worker): void { + this.stepWorkerRunningTasks(worker, -1) + } + + /** + * Step the number of tasks that the given worker has applied. + * + * @param worker Worker which running tasks are stepped. + * @param step Number of running tasks step. + */ + private stepWorkerRunningTasks (worker: Worker, step: number): void { + const tasksUsage = this.workersTasksUsage.get(worker) + if (tasksUsage !== undefined) { + tasksUsage.running = tasksUsage.running + step + this.workersTasksUsage.set(worker, tasksUsage) + } else { + throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP) + } + } + + /** + * Step the number of tasks that the given worker has run. + * + * @param worker Worker which has run tasks. + * @param step Number of run tasks step. + */ + private stepWorkerRunTasks (worker: Worker, step: number) { + const tasksUsage = this.workersTasksUsage.get(worker) + if (tasksUsage !== undefined) { + tasksUsage.run = tasksUsage.run + step + this.workersTasksUsage.set(worker, tasksUsage) + } else { + throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP) + } + } + + /** + * Update tasks run time for the given worker. + * + * @param worker Worker which run the task. + * @param taskRunTime Worker task run time. + */ + private updateWorkerTasksRunTime ( + worker: Worker, + taskRunTime: number | undefined + ) { + const tasksUsage = this.workersTasksUsage.get(worker) + if (tasksUsage !== undefined && tasksUsage.run !== 0) { + tasksUsage.runTime += taskRunTime ?? 0 + tasksUsage.avgRunTime = tasksUsage.runTime / tasksUsage.run + this.workersTasksUsage.set(worker, tasksUsage) + } else { + throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP) + } + } + + /** + * Reset worker tasks usage statistics. + * + * @param worker The worker. + */ + private resetWorkerTasksUsage (worker: Worker): void { + this.workersTasksUsage.delete(worker) + } } diff --git a/src/pools/pool-internal.ts b/src/pools/pool-internal.ts index 71120110..f5f1d190 100644 --- a/src/pools/pool-internal.ts +++ b/src/pools/pool-internal.ts @@ -10,6 +10,16 @@ export enum PoolType { DYNAMIC = 'dynamic' } +/** + * Tasks usage statistics. + */ +export interface TasksUsage { + run: number + running: number + runTime: number + avgRunTime: number +} + /** * Internal poolifier pool emitter. */ @@ -32,14 +42,6 @@ export interface IPoolInternal< */ readonly workers: Worker[] - /** - * The tasks map. - * - * - `key`: The `Worker` - * - `value`: Number of tasks currently in progress on the worker. - */ - readonly tasks: Map - /** * Emitter on which events can be listened to. * @@ -99,4 +101,12 @@ export interface IPoolInternal< * @returns The number of tasks currently running on the worker. */ getWorkerRunningTasks(worker: Worker): number | undefined + + /** + * Get worker average tasks run time. + * + * @param worker The worker. + * @returns The average tasks run time on the worker. + */ + getWorkerAverageTasksRunTime(worker: Worker): number | undefined } diff --git a/src/utility-types.ts b/src/utility-types.ts index c9c5492c..104725ac 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -31,6 +31,10 @@ export interface MessageValue< * Error. */ readonly error?: string + /** + * Task run time. + */ + readonly taskRunTime?: number /** * Reference to main worker. * diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index b49ede8b..349940fa 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -171,8 +171,10 @@ export abstract class AbstractWorker< value: MessageValue ): void { try { + const startTaskTimestamp = Date.now() const res = fn(value.data) - this.sendToMainWorker({ data: res, id: value.id }) + const taskRunTime = Date.now() - startTaskTimestamp + this.sendToMainWorker({ data: res, id: value.id, taskRunTime }) } catch (e) { const err = this.handleError(e as Error) this.sendToMainWorker({ error: err, id: value.id }) @@ -191,9 +193,11 @@ export abstract class AbstractWorker< fn: (data?: Data) => Promise, value: MessageValue ): void { + const startTaskTimestamp = Date.now() fn(value.data) .then(res => { - this.sendToMainWorker({ data: res, id: value.id }) + const taskRunTime = Date.now() - startTaskTimestamp + this.sendToMainWorker({ data: res, id: value.id, taskRunTime }) return null }) .catch(e => { diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index d9ab30fe..24c7debd 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -4,13 +4,16 @@ const { FixedThreadPool, WorkerChoiceStrategies } = require('../../../lib/index') -const expectedError = new Error('Worker could not be found in tasks map') const numberOfWorkers = 1 -class StubPoolWithTasksMapClear extends FixedThreadPool { +const workerNotFoundInTasksUsageMapError = new Error( + 'Worker could not be found in worker tasks usage map' +) + +class StubPoolWithWorkerTasksUsageMapClear extends FixedThreadPool { removeAllWorker () { - this.tasks.clear() + this.workersTasksUsage.clear() } } @@ -21,31 +24,6 @@ class StubPoolWithIsMainMethod extends FixedThreadPool { } describe('Abstract pool test suite', () => { - it('Simulate worker not found during increaseWorkersTask', () => { - const pool = new StubPoolWithTasksMapClear( - numberOfWorkers, - './tests/worker-files/thread/testWorker.js' - ) - // Simulate worker not found. - pool.removeAllWorker() - expect(() => pool.increaseWorkersTask()).toThrowError(expectedError) - pool.destroy() - }) - - it('Simulate worker not found during decreaseWorkersTasks', () => { - const pool = new StubPoolWithTasksMapClear( - numberOfWorkers, - './tests/worker-files/thread/testWorker.js', - { - errorHandler: e => console.error(e) - } - ) - // Simulate worker not found. - pool.removeAllWorker() - expect(() => pool.decreaseWorkersTasks()).toThrowError(expectedError) - pool.destroy() - }) - it('Simulate pool creation from a non main thread/process', () => { expect( () => @@ -139,6 +117,109 @@ describe('Abstract pool test suite', () => { pool.destroy() }) + it('Simulate worker not found during increaseWorkerRunningTasks', () => { + const pool = new StubPoolWithWorkerTasksUsageMapClear( + numberOfWorkers, + './tests/worker-files/cluster/testWorker.js' + ) + // Simulate worker not found. + pool.removeAllWorker() + expect(() => pool.increaseWorkerRunningTasks()).toThrowError( + workerNotFoundInTasksUsageMapError + ) + pool.destroy() + }) + + it('Simulate worker not found during decreaseWorkerRunningTasks', () => { + const pool = new StubPoolWithWorkerTasksUsageMapClear( + numberOfWorkers, + './tests/worker-files/cluster/testWorker.js', + { + errorHandler: e => console.error(e) + } + ) + // Simulate worker not found. + pool.removeAllWorker() + expect(() => pool.decreaseWorkerRunningTasks()).toThrowError( + workerNotFoundInTasksUsageMapError + ) + pool.destroy() + }) + + it('Simulate worker not found during stepWorkerRunTasks', () => { + const pool = new StubPoolWithWorkerTasksUsageMapClear( + numberOfWorkers, + './tests/worker-files/cluster/testWorker.js', + { + errorHandler: e => console.error(e) + } + ) + // Simulate worker not found. + pool.removeAllWorker() + expect(() => pool.stepWorkerRunTasks()).toThrowError( + workerNotFoundInTasksUsageMapError + ) + pool.destroy() + }) + + it('Simulate worker not found during updateWorkerTasksRunTime', () => { + const pool = new StubPoolWithWorkerTasksUsageMapClear( + numberOfWorkers, + './tests/worker-files/cluster/testWorker.js', + { + errorHandler: e => console.error(e) + } + ) + // Simulate worker not found. + pool.removeAllWorker() + expect(() => pool.updateWorkerTasksRunTime()).toThrowError( + workerNotFoundInTasksUsageMapError + ) + pool.destroy() + }) + + it('Verify that worker pool tasks usage are initialized', () => { + const pool = new FixedClusterPool( + numberOfWorkers, + './tests/worker-files/cluster/testWorker.js' + ) + for (const tasksUsage of pool.workersTasksUsage.values()) { + expect(tasksUsage).toBeDefined() + expect(tasksUsage.run).toBe(0) + expect(tasksUsage.running).toBe(0) + expect(tasksUsage.runTime).toBe(0) + expect(tasksUsage.avgRunTime).toBe(0) + } + pool.destroy() + }) + + it('Verify that worker pool tasks usage are computed', async () => { + const pool = new FixedClusterPool( + numberOfWorkers, + './tests/worker-files/cluster/testWorker.js' + ) + const promises = [] + for (let i = 0; i < numberOfWorkers * 2; i++) { + promises.push(pool.execute({ test: 'test' })) + } + for (const tasksUsage of pool.workersTasksUsage.values()) { + expect(tasksUsage).toBeDefined() + expect(tasksUsage.run).toBe(0) + expect(tasksUsage.running).toBe(numberOfWorkers * 2) + expect(tasksUsage.runTime).toBe(0) + expect(tasksUsage.avgRunTime).toBe(0) + } + await Promise.all(promises) + for (const tasksUsage of pool.workersTasksUsage.values()) { + expect(tasksUsage).toBeDefined() + expect(tasksUsage.run).toBe(numberOfWorkers * 2) + expect(tasksUsage.running).toBe(0) + expect(tasksUsage.runTime).toBeGreaterThanOrEqual(0) + expect(tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) + } + pool.destroy() + }) + it("Verify that pool event emitter 'busy' event can register a callback", async () => { const pool = new FixedThreadPool( numberOfWorkers, diff --git a/tests/worker-files/cluster/testWorker.js b/tests/worker-files/cluster/testWorker.js index f6115cde..cba1ec8e 100644 --- a/tests/worker-files/cluster/testWorker.js +++ b/tests/worker-files/cluster/testWorker.js @@ -4,7 +4,7 @@ const { isMaster } = require('cluster') const TestUtils = require('../../test-utils') function test (data) { - TestUtils.jsonIntegerSerialization(50) + TestUtils.jsonIntegerSerialization(100) return isMaster } diff --git a/tests/worker-files/thread/testWorker.js b/tests/worker-files/thread/testWorker.js index ccf55b56..773e1116 100644 --- a/tests/worker-files/thread/testWorker.js +++ b/tests/worker-files/thread/testWorker.js @@ -4,7 +4,7 @@ const { isMainThread } = require('worker_threads') const TestUtils = require('../../test-utils') function test (data) { - TestUtils.jsonIntegerSerialization(50) + TestUtils.jsonIntegerSerialization(100) return isMainThread }