Only allow primitive JSON for transfer between worker and main worker (#128)
[poolifier.git] / src / pools / thread / fixed.ts
CommitLineData
fa699c42 1import { isMainThread, MessageChannel, SHARE_ENV, Worker } from 'worker_threads'
d3c8a1a8 2import type { Draft, JSONValue, MessageValue } from '../../utility-types'
c97c7edb
S
3import type { PoolOptions } from '../abstract-pool'
4import { AbstractPool } from '../abstract-pool'
4ade5f1f 5
c97c7edb 6export type ThreadWorkerWithMessageChannel = Worker & Draft<MessageChannel>
4ade5f1f
S
7
8/**
9 * A thread pool with a static number of threads, is possible to execute tasks in sync or async mode as you prefer.
10 *
11 * This pool will select the worker thread in a round robin fashion.
12 *
13 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
14 * @since 0.0.1
15 */
d3c8a1a8
S
16export class FixedThreadPool<
17 Data extends JSONValue = JSONValue,
18 Response extends JSONValue = JSONValue
19> extends AbstractPool<ThreadWorkerWithMessageChannel, Data, Response> {
4ade5f1f
S
20 /**
21 * @param numThreads Num of threads for this worker pool.
22 * @param filePath A file path with implementation of `ThreadWorker` class, relative path is fine.
23 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
24 */
25 public constructor (
c97c7edb
S
26 numThreads: number,
27 filePath: string,
28 opts: PoolOptions<ThreadWorkerWithMessageChannel> = { maxTasks: 1000 }
4ade5f1f 29 ) {
c97c7edb
S
30 super(numThreads, filePath, opts)
31 }
4ade5f1f 32
c97c7edb
S
33 protected isMain (): boolean {
34 return isMainThread
4ade5f1f
S
35 }
36
c97c7edb
S
37 protected async destroyWorker (
38 worker: ThreadWorkerWithMessageChannel
39 ): Promise<void> {
40 await worker.terminate()
4ade5f1f
S
41 }
42
c97c7edb
S
43 protected sendToWorker (
44 worker: ThreadWorkerWithMessageChannel,
45 message: MessageValue<Data>
46 ): void {
47 worker.postMessage(message)
4ade5f1f
S
48 }
49
c97c7edb
S
50 protected registerWorkerMessageListener (
51 port: ThreadWorkerWithMessageChannel,
52 listener: (message: MessageValue<Response>) => void
53 ): void {
54 port.port2?.on('message', listener)
4ade5f1f
S
55 }
56
c97c7edb
S
57 protected unregisterWorkerMessageListener (
58 port: ThreadWorkerWithMessageChannel,
59 listener: (message: MessageValue<Response>) => void
60 ): void {
61 port.port2?.removeListener('message', listener)
4ade5f1f
S
62 }
63
c97c7edb
S
64 protected newWorker (): ThreadWorkerWithMessageChannel {
65 return new Worker(this.filePath, {
4ade5f1f
S
66 env: SHARE_ENV
67 })
c97c7edb
S
68 }
69
70 protected afterNewWorkerPushed (
71 worker: ThreadWorkerWithMessageChannel
72 ): void {
4ade5f1f
S
73 const { port1, port2 } = new MessageChannel()
74 worker.postMessage({ parent: port1 }, [port1])
75 worker.port1 = port1
76 worker.port2 = port2
77 // we will attach a listener for every task,
78 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
ee99693b 79 worker.port2.setMaxListeners(this.opts.maxTasks ?? 1000)
4ade5f1f
S
80 }
81}