fix: fix OCPP message sending promise leak
[e-mobility-charging-stations-simulator.git] / src / charging-station / ocpp / OCPPRequestService.ts
1 import Ajv, { type JSONSchemaType, type ValidateFunction } from 'ajv';
2 import ajvFormats from 'ajv-formats';
3
4 import { OCPPConstants } from './OCPPConstants';
5 import type { OCPPResponseService } from './OCPPResponseService';
6 import { OCPPServiceUtils } from './OCPPServiceUtils';
7 import type { ChargingStation } from '../../charging-station';
8 import { OCPPError } from '../../exception';
9 import { PerformanceStatistics } from '../../performance';
10 import {
11 type ErrorCallback,
12 type ErrorResponse,
13 ErrorType,
14 type IncomingRequestCommand,
15 type JsonObject,
16 type JsonType,
17 MessageType,
18 type OCPPVersion,
19 type OutgoingRequest,
20 RequestCommand,
21 type RequestParams,
22 type Response,
23 type ResponseCallback,
24 type ResponseType,
25 } from '../../types';
26 import { Constants, cloneObject, handleSendMessageError, logger } from '../../utils';
27
28 const moduleName = 'OCPPRequestService';
29
30 const defaultRequestParams: RequestParams = {
31 skipBufferingOnError: false,
32 triggerMessage: false,
33 throwError: false,
34 };
35
36 export abstract class OCPPRequestService {
37 private static instance: OCPPRequestService | null = null;
38 private readonly version: OCPPVersion;
39 private readonly ajv: Ajv;
40 private readonly ocppResponseService: OCPPResponseService;
41 private readonly jsonValidateFunctions: Map<RequestCommand, ValidateFunction<JsonType>>;
42 protected abstract jsonSchemas: Map<RequestCommand, JSONSchemaType<JsonType>>;
43
44 protected constructor(version: OCPPVersion, ocppResponseService: OCPPResponseService) {
45 this.version = version;
46 this.ajv = new Ajv({
47 keywords: ['javaType'],
48 multipleOfPrecision: 2,
49 });
50 ajvFormats(this.ajv);
51 this.jsonValidateFunctions = new Map<RequestCommand, ValidateFunction<JsonType>>();
52 this.ocppResponseService = ocppResponseService;
53 this.requestHandler = this.requestHandler.bind(this) as <
54 // eslint-disable-next-line @typescript-eslint/no-unused-vars
55 ReqType extends JsonType,
56 ResType extends JsonType,
57 >(
58 chargingStation: ChargingStation,
59 commandName: RequestCommand,
60 commandParams?: JsonType,
61 params?: RequestParams,
62 ) => Promise<ResType>;
63 this.sendMessage = this.sendMessage.bind(this) as (
64 chargingStation: ChargingStation,
65 messageId: string,
66 messagePayload: JsonType,
67 commandName: RequestCommand,
68 params?: RequestParams,
69 ) => Promise<ResponseType>;
70 this.sendResponse = this.sendResponse.bind(this) as (
71 chargingStation: ChargingStation,
72 messageId: string,
73 messagePayload: JsonType,
74 commandName: IncomingRequestCommand,
75 ) => Promise<ResponseType>;
76 this.sendError = this.sendError.bind(this) as (
77 chargingStation: ChargingStation,
78 messageId: string,
79 ocppError: OCPPError,
80 commandName: RequestCommand | IncomingRequestCommand,
81 ) => Promise<ResponseType>;
82 this.internalSendMessage = this.internalSendMessage.bind(this) as (
83 chargingStation: ChargingStation,
84 messageId: string,
85 messagePayload: JsonType | OCPPError,
86 messageType: MessageType,
87 commandName: RequestCommand | IncomingRequestCommand,
88 params?: RequestParams,
89 ) => Promise<ResponseType>;
90 this.buildMessageToSend = this.buildMessageToSend.bind(this) as (
91 chargingStation: ChargingStation,
92 messageId: string,
93 messagePayload: JsonType | OCPPError,
94 messageType: MessageType,
95 commandName: RequestCommand | IncomingRequestCommand,
96 ) => string;
97 this.validateRequestPayload = this.validateRequestPayload.bind(this) as <T extends JsonType>(
98 chargingStation: ChargingStation,
99 commandName: RequestCommand | IncomingRequestCommand,
100 payload: T,
101 ) => boolean;
102 this.validateIncomingRequestResponsePayload = this.validateIncomingRequestResponsePayload.bind(
103 this,
104 ) as <T extends JsonType>(
105 chargingStation: ChargingStation,
106 commandName: RequestCommand | IncomingRequestCommand,
107 payload: T,
108 ) => boolean;
109 }
110
111 public static getInstance<T extends OCPPRequestService>(
112 this: new (ocppResponseService: OCPPResponseService) => T,
113 ocppResponseService: OCPPResponseService,
114 ): T {
115 if (OCPPRequestService.instance === null) {
116 OCPPRequestService.instance = new this(ocppResponseService);
117 }
118 return OCPPRequestService.instance as T;
119 }
120
121 public async sendResponse(
122 chargingStation: ChargingStation,
123 messageId: string,
124 messagePayload: JsonType,
125 commandName: IncomingRequestCommand,
126 ): Promise<ResponseType> {
127 try {
128 // Send response message
129 return await this.internalSendMessage(
130 chargingStation,
131 messageId,
132 messagePayload,
133 MessageType.CALL_RESULT_MESSAGE,
134 commandName,
135 );
136 } catch (error) {
137 handleSendMessageError(chargingStation, commandName, error as Error, {
138 throwError: true,
139 });
140 return null;
141 }
142 }
143
144 public async sendError(
145 chargingStation: ChargingStation,
146 messageId: string,
147 ocppError: OCPPError,
148 commandName: RequestCommand | IncomingRequestCommand,
149 ): Promise<ResponseType> {
150 try {
151 // Send error message
152 return await this.internalSendMessage(
153 chargingStation,
154 messageId,
155 ocppError,
156 MessageType.CALL_ERROR_MESSAGE,
157 commandName,
158 );
159 } catch (error) {
160 handleSendMessageError(chargingStation, commandName, error as Error);
161 return null;
162 }
163 }
164
165 protected async sendMessage(
166 chargingStation: ChargingStation,
167 messageId: string,
168 messagePayload: JsonType,
169 commandName: RequestCommand,
170 params?: RequestParams,
171 ): Promise<ResponseType> {
172 params = {
173 ...defaultRequestParams,
174 ...params,
175 };
176 try {
177 return await this.internalSendMessage(
178 chargingStation,
179 messageId,
180 messagePayload,
181 MessageType.CALL_MESSAGE,
182 commandName,
183 params,
184 );
185 } catch (error) {
186 handleSendMessageError(chargingStation, commandName, error as Error, {
187 throwError: params.throwError,
188 });
189 return null;
190 }
191 }
192
193 private validateRequestPayload<T extends JsonType>(
194 chargingStation: ChargingStation,
195 commandName: RequestCommand | IncomingRequestCommand,
196 payload: T,
197 ): boolean {
198 if (chargingStation.stationInfo?.ocppStrictCompliance === false) {
199 return true;
200 }
201 if (this.jsonSchemas.has(commandName as RequestCommand) === false) {
202 logger.warn(
203 `${chargingStation.logPrefix()} ${moduleName}.validateRequestPayload: No JSON schema found for command '${commandName}' PDU validation`,
204 );
205 return true;
206 }
207 const validate = this.getJsonRequestValidateFunction<T>(commandName as RequestCommand);
208 payload = cloneObject<T>(payload);
209 OCPPServiceUtils.convertDateToISOString<T>(payload);
210 if (validate(payload)) {
211 return true;
212 }
213 logger.error(
214 `${chargingStation.logPrefix()} ${moduleName}.validateRequestPayload: Command '${commandName}' request PDU is invalid: %j`,
215 validate.errors,
216 );
217 // OCPPError usage here is debatable: it's an error in the OCPP stack but not targeted to sendError().
218 throw new OCPPError(
219 OCPPServiceUtils.ajvErrorsToErrorType(validate.errors),
220 'Request PDU is invalid',
221 commandName,
222 JSON.stringify(validate.errors, undefined, 2),
223 );
224 }
225
226 private getJsonRequestValidateFunction<T extends JsonType>(commandName: RequestCommand) {
227 if (this.jsonValidateFunctions.has(commandName) === false) {
228 this.jsonValidateFunctions.set(
229 commandName,
230 this.ajv.compile<T>(this.jsonSchemas.get(commandName)!).bind(this),
231 );
232 }
233 return this.jsonValidateFunctions.get(commandName)!;
234 }
235
236 private validateIncomingRequestResponsePayload<T extends JsonType>(
237 chargingStation: ChargingStation,
238 commandName: RequestCommand | IncomingRequestCommand,
239 payload: T,
240 ): boolean {
241 if (chargingStation.stationInfo?.ocppStrictCompliance === false) {
242 return true;
243 }
244 if (
245 this.ocppResponseService.jsonIncomingRequestResponseSchemas.has(
246 commandName as IncomingRequestCommand,
247 ) === false
248 ) {
249 logger.warn(
250 `${chargingStation.logPrefix()} ${moduleName}.validateIncomingRequestResponsePayload: No JSON schema found for command '${commandName}' PDU validation`,
251 );
252 return true;
253 }
254 const validate = this.getJsonRequestResponseValidateFunction<T>(
255 commandName as IncomingRequestCommand,
256 );
257 payload = cloneObject<T>(payload);
258 OCPPServiceUtils.convertDateToISOString<T>(payload);
259 if (validate(payload)) {
260 return true;
261 }
262 logger.error(
263 `${chargingStation.logPrefix()} ${moduleName}.validateIncomingRequestResponsePayload: Command '${commandName}' reponse PDU is invalid: %j`,
264 validate.errors,
265 );
266 // OCPPError usage here is debatable: it's an error in the OCPP stack but not targeted to sendError().
267 throw new OCPPError(
268 OCPPServiceUtils.ajvErrorsToErrorType(validate.errors),
269 'Response PDU is invalid',
270 commandName,
271 JSON.stringify(validate.errors, undefined, 2),
272 );
273 }
274
275 private getJsonRequestResponseValidateFunction<T extends JsonType>(
276 commandName: IncomingRequestCommand,
277 ) {
278 if (
279 this.ocppResponseService.jsonIncomingRequestResponseValidateFunctions.has(commandName) ===
280 false
281 ) {
282 this.ocppResponseService.jsonIncomingRequestResponseValidateFunctions.set(
283 commandName,
284 this.ajv
285 .compile<T>(this.ocppResponseService.jsonIncomingRequestResponseSchemas.get(commandName)!)
286 .bind(this),
287 );
288 }
289 return this.ocppResponseService.jsonIncomingRequestResponseValidateFunctions.get(commandName)!;
290 }
291
292 private async internalSendMessage(
293 chargingStation: ChargingStation,
294 messageId: string,
295 messagePayload: JsonType | OCPPError,
296 messageType: MessageType,
297 commandName: RequestCommand | IncomingRequestCommand,
298 params?: RequestParams,
299 ): Promise<ResponseType> {
300 params = {
301 ...defaultRequestParams,
302 ...params,
303 };
304 if (
305 (chargingStation.inUnknownState() === true &&
306 commandName === RequestCommand.BOOT_NOTIFICATION) ||
307 (chargingStation.stationInfo?.ocppStrictCompliance === false &&
308 chargingStation.inUnknownState() === true) ||
309 chargingStation.inAcceptedState() === true ||
310 (chargingStation.inPendingState() === true &&
311 (params.triggerMessage === true || messageType === MessageType.CALL_RESULT_MESSAGE))
312 ) {
313 // eslint-disable-next-line @typescript-eslint/no-this-alias
314 const self = this;
315 // Send a message through wsConnection
316 return new Promise<ResponseType>((resolve, reject) => {
317 /**
318 * Function that will receive the request's response
319 *
320 * @param payload -
321 * @param requestPayload -
322 */
323 const responseCallback = (payload: JsonType, requestPayload: JsonType): void => {
324 if (chargingStation.stationInfo?.enableStatistics === true) {
325 chargingStation.performanceStatistics?.addRequestStatistic(
326 commandName,
327 MessageType.CALL_RESULT_MESSAGE,
328 );
329 }
330 // Handle the request's response
331 self.ocppResponseService
332 .responseHandler(
333 chargingStation,
334 commandName as RequestCommand,
335 payload,
336 requestPayload,
337 )
338 .then(() => {
339 resolve(payload);
340 })
341 .catch((error) => {
342 reject(error);
343 })
344 .finally(() => {
345 chargingStation.requests.delete(messageId);
346 });
347 };
348
349 /**
350 * Function that will receive the request's error response
351 *
352 * @param ocppError -
353 * @param requestStatistic -
354 */
355 const errorCallback = (ocppError: OCPPError, requestStatistic = true): void => {
356 if (requestStatistic === true && chargingStation.stationInfo?.enableStatistics === true) {
357 chargingStation.performanceStatistics?.addRequestStatistic(
358 commandName,
359 MessageType.CALL_ERROR_MESSAGE,
360 );
361 }
362 logger.error(
363 `${chargingStation.logPrefix()} Error occurred at ${OCPPServiceUtils.getMessageTypeString(
364 messageType,
365 )} command ${commandName} with PDU %j:`,
366 messagePayload,
367 ocppError,
368 );
369 chargingStation.requests.delete(messageId);
370 reject(ocppError);
371 };
372
373 const rejectWithOcppError = (ocppError: OCPPError): void => {
374 // Reject response
375 if (messageType !== MessageType.CALL_MESSAGE) {
376 return reject(ocppError);
377 }
378 // Reject and remove request from the cache
379 return errorCallback(ocppError, false);
380 };
381
382 if (chargingStation.stationInfo?.enableStatistics === true) {
383 chargingStation.performanceStatistics?.addRequestStatistic(commandName, messageType);
384 }
385 const messageToSend = this.buildMessageToSend(
386 chargingStation,
387 messageId,
388 messagePayload,
389 messageType,
390 commandName,
391 );
392 // Check if wsConnection opened
393 if (chargingStation.isWebSocketConnectionOpened() === true) {
394 const beginId = PerformanceStatistics.beginMeasure(commandName);
395 const sendTimeout = setTimeout(() => {
396 return rejectWithOcppError(
397 new OCPPError(
398 ErrorType.GENERIC_ERROR,
399 `Timeout for message id '${messageId}'`,
400 commandName,
401 (messagePayload as JsonObject)?.details ?? Constants.EMPTY_FROZEN_OBJECT,
402 ),
403 );
404 }, OCPPConstants.OCPP_WEBSOCKET_TIMEOUT);
405 chargingStation.wsConnection?.send(messageToSend, (error?: Error) => {
406 clearTimeout(sendTimeout);
407 if (error) {
408 const ocppError = new OCPPError(
409 ErrorType.GENERIC_ERROR,
410 `WebSocket errored for ${
411 params?.skipBufferingOnError === false ? '' : 'non '
412 }buffered message id '${messageId}' with content '${messageToSend}'`,
413 commandName,
414 { name: error.name, message: error.message, stack: error.stack },
415 );
416 if (params?.skipBufferingOnError === false) {
417 // Buffer
418 chargingStation.bufferMessage(messageToSend);
419 // Reject and keep request in the cache
420 return reject(ocppError);
421 }
422 return rejectWithOcppError(ocppError);
423 }
424 });
425 if (messageType === MessageType.CALL_MESSAGE) {
426 this.cacheRequestPromise(
427 chargingStation,
428 messageId,
429 messagePayload as JsonType,
430 commandName,
431 responseCallback,
432 errorCallback,
433 );
434 }
435 logger.debug(
436 `${chargingStation.logPrefix()} >> Command '${commandName}' sent ${OCPPServiceUtils.getMessageTypeString(
437 messageType,
438 )} payload: ${messageToSend}`,
439 );
440 PerformanceStatistics.endMeasure(commandName, beginId);
441 } else {
442 const ocppError = new OCPPError(
443 ErrorType.GENERIC_ERROR,
444 `WebSocket closed for ${
445 params?.skipBufferingOnError === false ? '' : 'non '
446 }buffered message id '${messageId}' with content '${messageToSend}'`,
447 commandName,
448 (messagePayload as JsonObject)?.details ?? Constants.EMPTY_FROZEN_OBJECT,
449 );
450 if (params?.skipBufferingOnError === false) {
451 // Buffer
452 chargingStation.bufferMessage(messageToSend);
453 if (messageType === MessageType.CALL_MESSAGE) {
454 this.cacheRequestPromise(
455 chargingStation,
456 messageId,
457 messagePayload as JsonType,
458 commandName,
459 responseCallback,
460 errorCallback,
461 );
462 }
463 // Reject and keep request in the cache
464 return reject(ocppError);
465 }
466 return rejectWithOcppError(ocppError);
467 }
468 // Resolve response
469 if (messageType !== MessageType.CALL_MESSAGE) {
470 return resolve(messagePayload);
471 }
472 });
473 }
474 throw new OCPPError(
475 ErrorType.SECURITY_ERROR,
476 `Cannot send command ${commandName} PDU when the charging station is in ${chargingStation.getRegistrationStatus()} state on the central server`,
477 commandName,
478 );
479 }
480
481 private buildMessageToSend(
482 chargingStation: ChargingStation,
483 messageId: string,
484 messagePayload: JsonType | OCPPError,
485 messageType: MessageType,
486 commandName: RequestCommand | IncomingRequestCommand,
487 ): string {
488 let messageToSend: string;
489 // Type of message
490 switch (messageType) {
491 // Request
492 case MessageType.CALL_MESSAGE:
493 // Build request
494 this.validateRequestPayload(chargingStation, commandName, messagePayload as JsonType);
495 messageToSend = JSON.stringify([
496 messageType,
497 messageId,
498 commandName,
499 messagePayload,
500 ] as OutgoingRequest);
501 break;
502 // Response
503 case MessageType.CALL_RESULT_MESSAGE:
504 // Build response
505 this.validateIncomingRequestResponsePayload(
506 chargingStation,
507 commandName,
508 messagePayload as JsonType,
509 );
510 messageToSend = JSON.stringify([messageType, messageId, messagePayload] as Response);
511 break;
512 // Error Message
513 case MessageType.CALL_ERROR_MESSAGE:
514 // Build Error Message
515 messageToSend = JSON.stringify([
516 messageType,
517 messageId,
518 (messagePayload as OCPPError)?.code ?? ErrorType.GENERIC_ERROR,
519 (messagePayload as OCPPError)?.message ?? '',
520 (messagePayload as OCPPError)?.details ?? { commandName },
521 ] as ErrorResponse);
522 break;
523 }
524 return messageToSend;
525 }
526
527 private cacheRequestPromise(
528 chargingStation: ChargingStation,
529 messageId: string,
530 messagePayload: JsonType,
531 commandName: RequestCommand | IncomingRequestCommand,
532 responseCallback: ResponseCallback,
533 errorCallback: ErrorCallback,
534 ): void {
535 chargingStation.requests.set(messageId, [
536 responseCallback,
537 errorCallback,
538 commandName,
539 messagePayload,
540 ]);
541 }
542
543 // eslint-disable-next-line @typescript-eslint/no-unused-vars
544 public abstract requestHandler<ReqType extends JsonType, ResType extends JsonType>(
545 chargingStation: ChargingStation,
546 commandName: RequestCommand,
547 // FIXME: should be ReqType
548 commandParams?: JsonType,
549 params?: RequestParams,
550 ): Promise<ResType>;
551 }