Benchmarks and performance enhancements (#209)
[poolifier.git] / src / pools / thread / fixed.ts
CommitLineData
fa699c42 1import { isMainThread, MessageChannel, SHARE_ENV, Worker } from 'worker_threads'
deb85c12 2import type { Draft, MessageValue } from '../../utility-types'
c97c7edb
S
3import type { PoolOptions } from '../abstract-pool'
4import { AbstractPool } from '../abstract-pool'
4ade5f1f 5
729c563d
S
6/**
7 * A thread worker with message channels for communication between main thread and thread worker.
8 */
c97c7edb 9export type ThreadWorkerWithMessageChannel = Worker & Draft<MessageChannel>
4ade5f1f
S
10
11/**
729c563d
S
12 * A thread pool with a fixed number of threads.
13 *
14 * It is possible to perform tasks in sync or asynchronous mode as you prefer.
15 *
16 * This pool selects the threads in a round robin fashion.
4ade5f1f 17 *
deb85c12
JB
18 * @template Data Type of data sent to the worker. This can only be serializable data.
19 * @template Response Type of response of execution. This can only be serializable data.
4ade5f1f
S
20 *
21 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
22 * @since 0.0.1
23 */
d3c8a1a8 24export class FixedThreadPool<
deb85c12
JB
25 Data = unknown,
26 Response = unknown
d3c8a1a8 27> extends AbstractPool<ThreadWorkerWithMessageChannel, Data, Response> {
4ade5f1f 28 /**
729c563d
S
29 * Constructs a new poolifier fixed thread pool.
30 *
5c5a1fb7 31 * @param numberOfThreads Number of threads for this pool.
729c563d
S
32 * @param filePath Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
33 * @param opts Options for this fixed thread pool. Default: `{ maxTasks: 1000 }`
4ade5f1f
S
34 */
35 public constructor (
5c5a1fb7 36 numberOfThreads: number,
c97c7edb
S
37 filePath: string,
38 opts: PoolOptions<ThreadWorkerWithMessageChannel> = { maxTasks: 1000 }
4ade5f1f 39 ) {
5c5a1fb7 40 super(numberOfThreads, filePath, opts)
c97c7edb 41 }
4ade5f1f 42
c97c7edb
S
43 protected isMain (): boolean {
44 return isMainThread
4ade5f1f
S
45 }
46
a35560ba
S
47 /** @inheritdoc */
48 public async destroyWorker (
c97c7edb
S
49 worker: ThreadWorkerWithMessageChannel
50 ): Promise<void> {
cefac5ba 51 this.sendToWorker(worker, { kill: 1 })
c97c7edb 52 await worker.terminate()
4ade5f1f
S
53 }
54
c97c7edb
S
55 protected sendToWorker (
56 worker: ThreadWorkerWithMessageChannel,
57 message: MessageValue<Data>
58 ): void {
59 worker.postMessage(message)
4ade5f1f
S
60 }
61
a35560ba
S
62 /** @inheritdoc */
63 public registerWorkerMessageListener<Message extends Data | Response> (
4f7fa42a
S
64 messageChannel: ThreadWorkerWithMessageChannel,
65 listener: (message: MessageValue<Message>) => void
c97c7edb 66 ): void {
4f7fa42a 67 messageChannel.port2?.on('message', listener)
4ade5f1f
S
68 }
69
280c2a77 70 protected createWorker (): ThreadWorkerWithMessageChannel {
c97c7edb 71 return new Worker(this.filePath, {
4ade5f1f
S
72 env: SHARE_ENV
73 })
c97c7edb
S
74 }
75
280c2a77 76 protected afterWorkerSetup (worker: ThreadWorkerWithMessageChannel): void {
4ade5f1f
S
77 const { port1, port2 } = new MessageChannel()
78 worker.postMessage({ parent: port1 }, [port1])
79 worker.port1 = port1
80 worker.port2 = port2
a35560ba 81 // We will attach a listener for every task,
be0676b3 82 // when the task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size.
ee99693b 83 worker.port2.setMaxListeners(this.opts.maxTasks ?? 1000)
be0676b3 84 this.registerWorkerMessageListener(worker, super.workerListener())
4ade5f1f
S
85 }
86}