1 import { parentPort
} from
'worker_threads';
3 import type { JSONSchemaType
} from
'ajv';
4 import Ajv from
'ajv-draft-04';
5 import ajvFormats from
'ajv-formats';
7 import OCPPError from
'../../exception/OCPPError';
8 import PerformanceStatistics from
'../../performance/PerformanceStatistics';
9 import type { EmptyObject
} from
'../../types/EmptyObject';
10 import type { HandleErrorParams
} from
'../../types/Error';
11 import type { JsonObject
, JsonType
} from
'../../types/JsonType';
12 import { ErrorType
} from
'../../types/ocpp/ErrorType';
13 import { MessageType
} from
'../../types/ocpp/MessageType';
15 IncomingRequestCommand
,
20 } from
'../../types/ocpp/Requests';
21 import type { ErrorResponse
, Response
} from
'../../types/ocpp/Responses';
22 import Constants from
'../../utils/Constants';
23 import logger from
'../../utils/Logger';
24 import Utils from
'../../utils/Utils';
25 import type ChargingStation from
'../ChargingStation';
26 import { MessageChannelUtils
} from
'../MessageChannelUtils';
27 import type OCPPResponseService from
'./OCPPResponseService';
28 import { OCPPServiceUtils
} from
'./OCPPServiceUtils';
30 const moduleName
= 'OCPPRequestService';
32 export default abstract class OCPPRequestService
{
33 private static instance
: OCPPRequestService
| null = null;
36 private readonly ocppResponseService
: OCPPResponseService
;
38 protected constructor(ocppResponseService
: OCPPResponseService
) {
39 this.ocppResponseService
= ocppResponseService
;
42 this.requestHandler
.bind(this);
43 this.sendResponse
.bind(this);
44 this.sendError
.bind(this);
45 this.internalSendMessage
.bind(this);
46 this.buildMessageToSend
.bind(this);
47 this.validateRequestPayload
.bind(this);
50 public static getInstance
<T
extends OCPPRequestService
>(
51 this: new (ocppResponseService
: OCPPResponseService
) => T
,
52 ocppResponseService
: OCPPResponseService
54 if (OCPPRequestService
.instance
=== null) {
55 OCPPRequestService
.instance
= new this(ocppResponseService
);
57 return OCPPRequestService
.instance
as T
;
60 public async sendResponse(
61 chargingStation
: ChargingStation
,
63 messagePayload
: JsonType
,
64 commandName
: IncomingRequestCommand
65 ): Promise
<ResponseType
> {
67 // Send response message
68 return await this.internalSendMessage(
72 MessageType
.CALL_RESULT_MESSAGE
,
76 this.handleRequestError(chargingStation
, commandName
, error
as Error);
80 public async sendError(
81 chargingStation
: ChargingStation
,
84 commandName
: RequestCommand
| IncomingRequestCommand
85 ): Promise
<ResponseType
> {
88 return await this.internalSendMessage(
92 MessageType
.CALL_ERROR_MESSAGE
,
96 this.handleRequestError(chargingStation
, commandName
, error
as Error);
100 protected async sendMessage(
101 chargingStation
: ChargingStation
,
103 messagePayload
: JsonType
,
104 commandName
: RequestCommand
,
105 params
: RequestParams
= {
106 skipBufferingOnError
: false,
107 triggerMessage
: false,
109 ): Promise
<ResponseType
> {
111 return await this.internalSendMessage(
115 MessageType
.CALL_MESSAGE
,
120 this.handleRequestError(chargingStation
, commandName
, error
as Error, { throwError
: false });
124 protected validateRequestPayload
<T
extends JsonType
>(
125 chargingStation
: ChargingStation
,
126 commandName
: RequestCommand
,
127 schema
: JSONSchemaType
<T
>,
130 if (!chargingStation
.getPayloadSchemaValidation()) {
133 const validate
= this.ajv
.compile(schema
);
134 if (validate(payload
)) {
138 `${chargingStation.logPrefix()} ${moduleName}.validateRequestPayload: Request PDU is invalid: %j`,
142 OCPPServiceUtils
.ajvErrorsToErrorType(validate
.errors
),
143 'Request PDU is invalid',
145 JSON
.stringify(validate
.errors
, null, 2)
149 private async internalSendMessage(
150 chargingStation
: ChargingStation
,
152 messagePayload
: JsonType
| OCPPError
,
153 messageType
: MessageType
,
154 commandName
?: RequestCommand
| IncomingRequestCommand
,
155 params
: RequestParams
= {
156 skipBufferingOnError
: false,
157 triggerMessage
: false,
159 ): Promise
<ResponseType
> {
161 (chargingStation
.isInUnknownState() && commandName
=== RequestCommand
.BOOT_NOTIFICATION
) ||
162 (!chargingStation
.getOcppStrictCompliance() && chargingStation
.isInUnknownState()) ||
163 chargingStation
.isInAcceptedState() ||
164 (chargingStation
.isInPendingState() &&
165 (params
.triggerMessage
|| messageType
=== MessageType
.CALL_RESULT_MESSAGE
))
167 // eslint-disable-next-line @typescript-eslint/no-this-alias
169 // Send a message through wsConnection
170 return Utils
.promiseWithTimeout(
171 new Promise((resolve
, reject
) => {
172 const messageToSend
= this.buildMessageToSend(
181 if (chargingStation
.getEnableStatistics()) {
182 chargingStation
.performanceStatistics
.addRequestStatistic(commandName
, messageType
);
184 // Check if wsConnection opened
185 if (chargingStation
.isWebSocketConnectionOpened()) {
187 const beginId
= PerformanceStatistics
.beginMeasure(commandName
);
188 // FIXME: Handle sending error
189 chargingStation
.wsConnection
.send(messageToSend
);
190 PerformanceStatistics
.endMeasure(commandName
, beginId
);
192 `${chargingStation.logPrefix()} >> Command '${commandName}' sent ${this.getMessageTypeString(
194 )} payload: ${messageToSend}`
196 } else if (!params
.skipBufferingOnError
) {
198 chargingStation
.bufferMessage(messageToSend
);
199 const ocppError
= new OCPPError(
200 ErrorType
.GENERIC_ERROR
,
201 `WebSocket closed for buffered message id '${messageId}' with content '${messageToSend}'`,
203 (messagePayload
as JsonObject
)?.details
?? {}
205 if (messageType
=== MessageType
.CALL_MESSAGE
) {
206 // Reject it but keep the request in the cache
207 return reject(ocppError
);
209 return errorCallback(ocppError
, false);
212 return errorCallback(
214 ErrorType
.GENERIC_ERROR
,
215 `WebSocket closed for non buffered message id '${messageId}' with content '${messageToSend}'`,
217 (messagePayload
as JsonObject
)?.details
?? {}
223 if (messageType
!== MessageType
.CALL_MESSAGE
) {
225 return resolve(messagePayload
);
229 * Function that will receive the request's response
232 * @param requestPayload
234 async function responseCallback(
236 requestPayload
: JsonType
238 if (chargingStation
.getEnableStatistics()) {
239 chargingStation
.performanceStatistics
.addRequestStatistic(
241 MessageType
.CALL_RESULT_MESSAGE
244 // Handle the request's response
246 await self.ocppResponseService
.responseHandler(
248 commandName
as RequestCommand
,
256 chargingStation
.requests
.delete(messageId
);
257 // parentPort.postMessage(MessageChannelUtils.buildUpdatedMessage(chargingStation));
262 * Function that will receive the request's error response
265 * @param requestStatistic
267 function errorCallback(error
: OCPPError
, requestStatistic
= true): void {
268 if (requestStatistic
&& chargingStation
.getEnableStatistics()) {
269 chargingStation
.performanceStatistics
.addRequestStatistic(
271 MessageType
.CALL_ERROR_MESSAGE
275 `${chargingStation.logPrefix()} Error occurred when calling command ${commandName} with message data ${JSON.stringify(
280 chargingStation
.requests
.delete(messageId
);
281 // parentPort.postMessage(MessageChannelUtils.buildUpdatedMessage(chargingStation));
285 Constants
.OCPP_WEBSOCKET_TIMEOUT
,
287 ErrorType
.GENERIC_ERROR
,
288 `Timeout for message id '${messageId}'`,
290 (messagePayload
as JsonObject
)?.details
?? {}
293 messageType
=== MessageType
.CALL_MESSAGE
&& chargingStation
.requests
.delete(messageId
);
298 ErrorType
.SECURITY_ERROR
,
299 `Cannot send command ${commandName} PDU when the charging station is in ${chargingStation.getRegistrationStatus()} state on the central server`,
304 private buildMessageToSend(
305 chargingStation
: ChargingStation
,
307 messagePayload
: JsonType
| OCPPError
,
308 messageType
: MessageType
,
309 commandName
?: RequestCommand
| IncomingRequestCommand
,
310 responseCallback
?: (payload
: JsonType
, requestPayload
: JsonType
) => Promise
<void>,
311 errorCallback
?: (error
: OCPPError
, requestStatistic
?: boolean) => void
313 let messageToSend
: string;
315 switch (messageType
) {
317 case MessageType
.CALL_MESSAGE
:
319 chargingStation
.requests
.set(messageId
, [
323 messagePayload
as JsonType
,
325 messageToSend
= JSON
.stringify([
330 ] as OutgoingRequest
);
333 case MessageType
.CALL_RESULT_MESSAGE
:
335 messageToSend
= JSON
.stringify([messageType
, messageId
, messagePayload
] as Response
);
338 case MessageType
.CALL_ERROR_MESSAGE
:
339 // Build Error Message
340 messageToSend
= JSON
.stringify([
343 (messagePayload
as OCPPError
)?.code
?? ErrorType
.GENERIC_ERROR
,
344 (messagePayload
as OCPPError
)?.message
?? '',
345 (messagePayload
as OCPPError
)?.details
?? { commandName
},
349 return messageToSend
;
352 private getMessageTypeString(messageType
: MessageType
): string {
353 switch (messageType
) {
354 case MessageType
.CALL_MESSAGE
:
356 case MessageType
.CALL_RESULT_MESSAGE
:
358 case MessageType
.CALL_ERROR_MESSAGE
:
363 private handleRequestError(
364 chargingStation
: ChargingStation
,
365 commandName
: RequestCommand
| IncomingRequestCommand
,
367 params
: HandleErrorParams
<EmptyObject
> = { throwError
: true }
369 logger
.error(`${chargingStation.logPrefix()} Request command ${commandName} error:`, error
);
370 if (params
?.throwError
) {
375 // eslint-disable-next-line @typescript-eslint/no-unused-vars
376 public abstract requestHandler
<RequestType
extends JsonType
, ResponseType
extends JsonType
>(
377 chargingStation
: ChargingStation
,
378 commandName
: RequestCommand
,
379 commandParams
?: JsonType
,
380 params
?: RequestParams
381 ): Promise
<ResponseType
>;