]> Piment Noir Git Repositories - poolifier.git/blame_incremental - src/worker/thread-worker.ts
Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / src / worker / thread-worker.ts
... / ...
CommitLineData
1import {
2 isMainThread,
3 type MessagePort,
4 parentPort,
5 threadId,
6} from 'node:worker_threads'
7
8import type { MessageValue } from '../utility-types.js'
9import type { TaskFunction, TaskFunctions } from './task-functions.js'
10import type { WorkerOptions } from './worker-options.js'
11
12import { AbortError } from './abort-error.js'
13import { AbstractWorker } from './abstract-worker.js'
14
15/**
16 * A thread worker used by a poolifier `ThreadPool`.
17 *
18 * When this worker is inactive for more than the given `maxInactiveTime`,
19 * it will send a termination request to its main thread.
20 *
21 * If you use a `DynamicThreadPool` the extra workers that were created will be terminated,
22 * but the minimum number of workers will be guaranteed.
23 * @typeParam Data - Type of data this worker receives from pool's execution. This can only be structured-cloneable data.
24 * @typeParam Response - Type of response the worker sends back to the main thread. This can only be structured-cloneable data.
25 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
26 * @since 0.0.1
27 */
28export class ThreadWorker<
29 Data = unknown,
30 Response = unknown
31> extends AbstractWorker<MessagePort, Data, Response> {
32 /** @inheritDoc */
33 protected get id (): number {
34 return threadId
35 }
36
37 /**
38 * Message port used to communicate with the main worker.
39 */
40 private port?: MessagePort
41
42 /**
43 * Constructs a new poolifier thread worker.
44 * @param taskFunctions - Task function(s) processed by the worker when the pool's `execute` method is invoked.
45 * @param opts - Options for the worker.
46 */
47 public constructor (
48 taskFunctions: TaskFunction<Data, Response> | TaskFunctions<Data, Response>,
49 opts: WorkerOptions = {}
50 ) {
51 super(isMainThread, parentPort, taskFunctions, opts)
52 }
53
54 /**
55 * @inheritDoc
56 */
57 protected handleError (error: Error): {
58 aborted: boolean
59 error: Error
60 message: string
61 stack?: string
62 } {
63 return {
64 aborted: error instanceof AbortError,
65 error,
66 message: error.message,
67 stack: error.stack,
68 }
69 }
70
71 /** @inheritDoc */
72 protected override handleKillMessage (message: MessageValue<Data>): void {
73 super.handleKillMessage(message)
74 this.port?.unref()
75 this.port?.close()
76 }
77
78 /** @inheritDoc */
79 protected handleReadyMessage (message: MessageValue<Data>): void {
80 if (
81 message.workerId === this.id &&
82 message.ready === false &&
83 message.port != null
84 ) {
85 try {
86 this.port = message.port
87 this.port.on('message', this.messageListener.bind(this))
88 this.sendToMainWorker({
89 ready: true,
90 taskFunctionsProperties: this.listTaskFunctionsProperties(),
91 })
92 } catch {
93 this.sendToMainWorker({
94 ready: false,
95 taskFunctionsProperties: this.listTaskFunctionsProperties(),
96 })
97 }
98 }
99 }
100
101 /** @inheritDoc */
102 protected readonly sendToMainWorker = (
103 message: MessageValue<Response>
104 ): void => {
105 this.port?.postMessage({
106 ...message,
107 workerId: this.id,
108 } satisfies MessageValue<Response>)
109 }
110}