fix: check worker aliveness in all case
[poolifier.git] / src / pools / thread / fixed.ts
CommitLineData
fc3e6586
JB
1import {
2 isMainThread,
3 MessageChannel,
4 SHARE_ENV,
5 Worker
6} from 'node:worker_threads'
deb85c12 7import type { Draft, MessageValue } from '../../utility-types'
c97c7edb 8import { AbstractPool } from '../abstract-pool'
bdaf31cd 9import type { PoolOptions } from '../pool'
7c0ba920 10import { PoolType } from '../pool-internal'
4ade5f1f 11
729c563d
S
12/**
13 * A thread worker with message channels for communication between main thread and thread worker.
14 */
c97c7edb 15export type ThreadWorkerWithMessageChannel = Worker & Draft<MessageChannel>
4ade5f1f
S
16
17/**
729c563d
S
18 * A thread pool with a fixed number of threads.
19 *
20 * It is possible to perform tasks in sync or asynchronous mode as you prefer.
21 *
22 * This pool selects the threads in a round robin fashion.
4ade5f1f 23 *
38e795c1
JB
24 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
25 * @typeParam Response - Type of response of execution. This can only be serializable data.
4ade5f1f
S
26 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
27 * @since 0.0.1
28 */
d3c8a1a8 29export class FixedThreadPool<
deb85c12
JB
30 Data = unknown,
31 Response = unknown
d3c8a1a8 32> extends AbstractPool<ThreadWorkerWithMessageChannel, Data, Response> {
4ade5f1f 33 /**
729c563d
S
34 * Constructs a new poolifier fixed thread pool.
35 *
38e795c1
JB
36 * @param numberOfThreads - Number of threads for this pool.
37 * @param filePath - Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
38 * @param opts - Options for this fixed thread pool.
4ade5f1f
S
39 */
40 public constructor (
5c5a1fb7 41 numberOfThreads: number,
c97c7edb 42 filePath: string,
1927ee67 43 opts: PoolOptions<ThreadWorkerWithMessageChannel> = {}
4ade5f1f 44 ) {
5c5a1fb7 45 super(numberOfThreads, filePath, opts)
c97c7edb 46 }
4ade5f1f 47
38e795c1 48 /** {@inheritDoc} */
c97c7edb
S
49 protected isMain (): boolean {
50 return isMainThread
4ade5f1f
S
51 }
52
38e795c1 53 /** {@inheritDoc} */
a35560ba 54 public async destroyWorker (
c97c7edb
S
55 worker: ThreadWorkerWithMessageChannel
56 ): Promise<void> {
cefac5ba 57 this.sendToWorker(worker, { kill: 1 })
c97c7edb 58 await worker.terminate()
4ade5f1f
S
59 }
60
38e795c1 61 /** {@inheritDoc} */
c97c7edb
S
62 protected sendToWorker (
63 worker: ThreadWorkerWithMessageChannel,
64 message: MessageValue<Data>
65 ): void {
66 worker.postMessage(message)
4ade5f1f
S
67 }
68
38e795c1 69 /** {@inheritDoc} */
78cea37e 70 public registerWorkerMessageListener<Message extends Data | Response>(
4f7fa42a
S
71 messageChannel: ThreadWorkerWithMessageChannel,
72 listener: (message: MessageValue<Message>) => void
c97c7edb 73 ): void {
4f7fa42a 74 messageChannel.port2?.on('message', listener)
4ade5f1f
S
75 }
76
38e795c1 77 /** {@inheritDoc} */
280c2a77 78 protected createWorker (): ThreadWorkerWithMessageChannel {
c97c7edb 79 return new Worker(this.filePath, {
4ade5f1f
S
80 env: SHARE_ENV
81 })
c97c7edb
S
82 }
83
38e795c1 84 /** {@inheritDoc} */
280c2a77 85 protected afterWorkerSetup (worker: ThreadWorkerWithMessageChannel): void {
4ade5f1f
S
86 const { port1, port2 } = new MessageChannel()
87 worker.postMessage({ parent: port1 }, [port1])
88 worker.port1 = port1
89 worker.port2 = port2
a05c10de 90 // Listen to worker messages.
be0676b3 91 this.registerWorkerMessageListener(worker, super.workerListener())
4ade5f1f 92 }
7c0ba920 93
38e795c1 94 /** {@inheritDoc} */
7c0ba920
JB
95 public get type (): PoolType {
96 return PoolType.FIXED
97 }
98
38e795c1 99 /** {@inheritDoc} */
7c0ba920
JB
100 public get busy (): boolean {
101 return this.internalGetBusyStatus()
102 }
4ade5f1f 103}