Clean worker from pool after it was destroyed (#146)
authorShinigami <chrissi92@hotmail.de>
Sat, 13 Feb 2021 23:05:23 +0000 (00:05 +0100)
committerGitHub <noreply@github.com>
Sat, 13 Feb 2021 23:05:23 +0000 (00:05 +0100)
Co-authored-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/cluster/dynamic.ts
src/pools/cluster/fixed.ts
src/pools/thread/dynamic.ts
src/pools/thread/fixed.ts
tests/pools/cluster/fixed.test.js

index 6e4465edd7a262af497cf8e9b84942ca9b80d4f6..f6ed280cc3bbcf39b6a4aed1faa37603bf17786d 100644 (file)
@@ -24,6 +24,7 @@ export interface IWorker {
   on(event: 'error', handler: ErrorHandler<this>): void
   on(event: 'online', handler: OnlineHandler<this>): void
   on(event: 'exit', handler: ExitHandler<this>): void
+  once(event: 'exit', handler: ExitHandler<this>): void
 }
 
 /**
@@ -161,9 +162,7 @@ export abstract class AbstractPool<
   }
 
   public async destroy (): Promise<void> {
-    for (const worker of this.workers) {
-      await this.destroyWorker(worker)
-    }
+    await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
   }
 
   /**
@@ -288,8 +287,8 @@ export abstract class AbstractPool<
 
     worker.on('error', this.opts.errorHandler ?? (() => {}))
     worker.on('online', this.opts.onlineHandler ?? (() => {}))
-    // TODO handle properly when a worker exit
     worker.on('exit', this.opts.exitHandler ?? (() => {}))
+    worker.once('exit', () => this.removeWorker(worker))
 
     this.workers.push(worker)
 
index 6d665cef067eb9ffe78ee3cb26591142e1941e18..432feb3abb6b91017540561efcaa4f48608869da 100644 (file)
@@ -68,7 +68,6 @@ export class DynamicClusterPool<
         if (message.kill) {
           this.sendToWorker(worker, { kill: 1 })
           void this.destroyWorker(worker)
-          this.removeWorker(worker)
         }
       })
       return worker
index 0d8021ebf08f0011142a442cbc21175def17db87..6620d5f3149fa42821dc00ac8721706bdc118950 100644 (file)
@@ -60,7 +60,6 @@ export class FixedClusterPool<
 
   protected destroyWorker (worker: Worker): void {
     worker.kill()
-    // FIXME: The tests are currently failing, so these must be changed first
   }
 
   protected sendToWorker (worker: Worker, message: MessageValue<Data>): void {
index c10333158002bdb7ec16d3aec688746d42e3273f..9ab6bf30a7764ff3d18abd5ca99d1896cd055842 100644 (file)
@@ -68,7 +68,6 @@ export class DynamicThreadPool<
         if (message.kill) {
           this.sendToWorker(worker, { kill: 1 })
           void this.destroyWorker(worker)
-          this.removeWorker(worker)
         }
       })
       return worker
index 74c14ff2a33b6dc61e906e19bfdbaa7699fbc50e..bfd6f016d0d4f08525d1c19435d4a138e2a404a0 100644 (file)
@@ -48,7 +48,6 @@ export class FixedThreadPool<
     worker: ThreadWorkerWithMessageChannel
   ): Promise<void> {
     await worker.terminate()
-    // FIXME: The tests are currently failing, so these must be changed first
   }
 
   protected sendToWorker (
index aae9127f9f354e79a41f2d3ac3eb3322d2ef3342..d22f697a5d3893863c5c60798cd8d77a7dd7e451 100644 (file)
@@ -120,7 +120,7 @@ describe('Fixed cluster pool test suite ', () => {
         closedWorkers++
       })
     })
-    pool.destroy()
+    await pool.destroy()
     await new Promise(resolve => setTimeout(resolve, 200))
     expect(closedWorkers).toBe(numberOfWorkers)
   })