log the exact data for failed transmissions, not the opcode
[deb_libcec.git] / src / lib / adapter / Pulse-Eight / USBCECAdapterMessageQueue.cpp
1 /*
2 * This file is part of the libCEC(R) library.
3 *
4 * libCEC(R) is Copyright (C) 2011-2012 Pulse-Eight Limited. All rights reserved.
5 * libCEC(R) is an original work, containing original code.
6 *
7 * libCEC(R) is a trademark of Pulse-Eight Limited.
8 *
9 * This program is dual-licensed; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
22 *
23 *
24 * Alternatively, you can license this library under a commercial license,
25 * please contact Pulse-Eight Licensing for more information.
26 *
27 * For more information contact:
28 * Pulse-Eight Licensing <license@pulse-eight.com>
29 * http://www.pulse-eight.com/
30 * http://www.pulse-eight.net/
31 */
32
33 #include "env.h"
34 #include "USBCECAdapterMessageQueue.h"
35
36 #include "USBCECAdapterCommunication.h"
37 #include "USBCECAdapterMessage.h"
38 #include "lib/platform/sockets/socket.h"
39 #include "lib/LibCEC.h"
40 #include "lib/platform/util/StdString.h"
41
42 using namespace CEC;
43 using namespace PLATFORM;
44 using namespace std;
45
46 #define MESSAGE_QUEUE_SIGNAL_WAIT_TIME 1000
47
48 CCECAdapterMessageQueueEntry::CCECAdapterMessageQueueEntry(CCECAdapterMessageQueue *queue, CCECAdapterMessage *message) :
49 m_queue(queue),
50 m_message(message),
51 m_iPacketsLeft(message->IsTranmission() ? message->Size() / 4 : 1),
52 m_bSucceeded(false),
53 m_bWaiting(true),
54 m_queueTimeout(message->transmit_timeout) {}
55
56 CCECAdapterMessageQueueEntry::~CCECAdapterMessageQueueEntry(void) { }
57
58 void CCECAdapterMessageQueueEntry::Broadcast(void)
59 {
60 CLockObject lock(m_mutex);
61 m_condition.Broadcast();
62 }
63
64 bool CCECAdapterMessageQueueEntry::MessageReceived(const CCECAdapterMessage &message)
65 {
66 bool bHandled(false);
67
68 if (IsResponse(message))
69 {
70 switch (message.Message())
71 {
72 case MSGCODE_COMMAND_ACCEPTED:
73 bHandled = MessageReceivedCommandAccepted(message);
74 break;
75 case MSGCODE_TRANSMIT_SUCCEEDED:
76 bHandled = MessageReceivedTransmitSucceeded(message);
77 break;
78 default:
79 bHandled = MessageReceivedResponse(message);
80 break;
81 }
82 }
83
84 return bHandled;
85 }
86
87 void CCECAdapterMessageQueueEntry::Signal(void)
88 {
89 CLockObject lock(m_mutex);
90 m_bSucceeded = true;
91 m_condition.Signal();
92 }
93
94 bool CCECAdapterMessageQueueEntry::Wait(uint32_t iTimeout)
95 {
96 bool bReturn(false);
97 /* wait until we receive a signal when the tranmission succeeded */
98 {
99 CLockObject lock(m_mutex);
100 bReturn = m_bSucceeded ? true : m_condition.Wait(m_mutex, m_bSucceeded, iTimeout);
101 m_bWaiting = false;
102 }
103 return bReturn;
104 }
105
106 bool CCECAdapterMessageQueueEntry::IsWaiting(void)
107 {
108 CLockObject lock(m_mutex);
109 return m_bWaiting;
110 }
111
112 cec_adapter_messagecode CCECAdapterMessageQueueEntry::MessageCode(void)
113 {
114 return m_message->Message();
115 }
116
117 bool CCECAdapterMessageQueueEntry::IsResponseOld(const CCECAdapterMessage &msg)
118 {
119 cec_adapter_messagecode msgCode = msg.Message();
120
121 return msgCode == MessageCode() ||
122 msgCode == MSGCODE_COMMAND_ACCEPTED ||
123 msgCode == MSGCODE_COMMAND_REJECTED ||
124 (m_message->IsTranmission() && (msgCode == MSGCODE_TIMEOUT_ERROR ||
125 msgCode == MSGCODE_HIGH_ERROR ||
126 msgCode == MSGCODE_LOW_ERROR ||
127 msgCode == MSGCODE_RECEIVE_FAILED ||
128 msgCode == MSGCODE_TRANSMIT_FAILED_LINE ||
129 msgCode == MSGCODE_TRANSMIT_FAILED_ACK ||
130 msgCode == MSGCODE_TRANSMIT_FAILED_TIMEOUT_DATA ||
131 msgCode == MSGCODE_TRANSMIT_FAILED_TIMEOUT_LINE ||
132 msgCode == MSGCODE_TRANSMIT_SUCCEEDED));
133 }
134
135 bool CCECAdapterMessageQueueEntry::IsResponse(const CCECAdapterMessage &msg)
136 {
137 cec_adapter_messagecode thisMsgCode = m_message->Message();
138 cec_adapter_messagecode msgCode = msg.Message();
139 cec_adapter_messagecode msgResponse = msg.ResponseTo();
140
141 // msgcode matches, always a response
142 if (msgCode == MessageCode())
143 return true;
144
145 if (!ProvidesExtendedResponse())
146 return IsResponseOld(msg);
147
148 // response without a msgcode
149 if (msgResponse == MSGCODE_NOTHING)
150 return false;
151
152 // commands that only repond with accepted/rejected
153 if (thisMsgCode == MSGCODE_PING ||
154 thisMsgCode == MSGCODE_SET_ACK_MASK ||
155 thisMsgCode == MSGCODE_SET_CONTROLLED ||
156 thisMsgCode == MSGCODE_SET_AUTO_ENABLED ||
157 thisMsgCode == MSGCODE_SET_DEFAULT_LOGICAL_ADDRESS ||
158 thisMsgCode == MSGCODE_SET_LOGICAL_ADDRESS_MASK ||
159 thisMsgCode == MSGCODE_SET_PHYSICAL_ADDRESS ||
160 thisMsgCode == MSGCODE_SET_DEVICE_TYPE ||
161 thisMsgCode == MSGCODE_SET_HDMI_VERSION ||
162 thisMsgCode == MSGCODE_SET_OSD_NAME ||
163 thisMsgCode == MSGCODE_WRITE_EEPROM ||
164 thisMsgCode == MSGCODE_TRANSMIT_IDLETIME)
165 return thisMsgCode == msgResponse;
166
167 if (!m_message->IsTranmission())
168 {
169 m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_WARNING, "FIXME! not a transmission");
170 return false;
171 }
172
173 return ((msgCode == MSGCODE_COMMAND_ACCEPTED || msgCode == MSGCODE_COMMAND_REJECTED) &&
174 (msgResponse == MSGCODE_TRANSMIT_ACK_POLARITY || msgResponse == MSGCODE_TRANSMIT || msgResponse == MSGCODE_TRANSMIT_EOM)) ||
175 msgCode == MSGCODE_TIMEOUT_ERROR ||
176 msgCode == MSGCODE_RECEIVE_FAILED ||
177 msgCode == MSGCODE_TRANSMIT_FAILED_ACK ||
178 msgCode == MSGCODE_TRANSMIT_FAILED_TIMEOUT_DATA ||
179 msgCode == MSGCODE_TRANSMIT_FAILED_TIMEOUT_LINE ||
180 msgCode == MSGCODE_TRANSMIT_SUCCEEDED;
181 }
182
183 const char *CCECAdapterMessageQueueEntry::ToString(void) const
184 {
185 /* CEC transmissions got the 'set ack polarity' msgcode, which doesn't look nice */
186 if (m_message->IsTranmission())
187 return "CEC transmission";
188 else
189 return CCECAdapterMessage::ToString(m_message->Message());
190 }
191
192 bool CCECAdapterMessageQueueEntry::MessageReceivedCommandAccepted(const CCECAdapterMessage &message)
193 {
194 bool bSendSignal(false);
195 bool bHandled(false);
196 {
197 CLockObject lock(m_mutex);
198 if (m_iPacketsLeft > 0)
199 {
200 /* decrease by 1 */
201 m_iPacketsLeft--;
202
203 /* log this message */
204 CStdString strLog;
205 strLog.Format("%s - command accepted", ToString());
206 if (m_iPacketsLeft > 0)
207 strLog.AppendFormat(" - waiting for %d more", m_iPacketsLeft);
208 m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, strLog);
209
210 /* no more packets left and not a transmission, so we're done */
211 if (!m_message->IsTranmission() && m_iPacketsLeft == 0)
212 {
213 m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
214 m_message->response = message.packet;
215 bSendSignal = true;
216 }
217 bHandled = true;
218 }
219 }
220
221 if (bSendSignal)
222 Signal();
223
224 return bHandled;
225 }
226
227 bool CCECAdapterMessageQueueEntry::MessageReceivedTransmitSucceeded(const CCECAdapterMessage &message)
228 {
229 {
230 CLockObject lock(m_mutex);
231 if (m_iPacketsLeft == 0)
232 {
233 /* transmission succeeded, so we're done */
234 m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "%s - transmit succeeded", ToString());
235 m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
236 m_message->response = message.packet;
237 }
238 else
239 {
240 /* error, we expected more acks
241 since the messages are processed in order, this should not happen, so this is an error situation */
242 m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_WARNING, "%s - received 'transmit succeeded' but not enough 'command accepted' messages (%d left)", ToString(), m_iPacketsLeft);
243 m_message->state = ADAPTER_MESSAGE_STATE_ERROR;
244 }
245 }
246
247 Signal();
248
249 return true;
250 }
251
252 bool CCECAdapterMessageQueueEntry::MessageReceivedResponse(const CCECAdapterMessage &message)
253 {
254 {
255 CLockObject lock(m_mutex);
256 m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "%s - received response - %s", ToString(), message.ToString().c_str());
257 m_message->response = message.packet;
258 if (m_message->IsTranmission())
259 m_message->state = message.Message() == MSGCODE_TRANSMIT_SUCCEEDED ? ADAPTER_MESSAGE_STATE_SENT_ACKED : ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
260 else
261 m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
262 }
263
264 Signal();
265
266 return true;
267 }
268
269 bool CCECAdapterMessageQueueEntry::ProvidesExtendedResponse(void)
270 {
271 return m_queue && m_queue->ProvidesExtendedResponse();
272 }
273
274 bool CCECAdapterMessageQueueEntry::TimedOutOrSucceeded(void) const
275 {
276 return m_message->bFireAndForget && (m_bSucceeded || m_queueTimeout.TimeLeft() == 0);
277 }
278
279 CCECAdapterMessageQueue::CCECAdapterMessageQueue(CUSBCECAdapterCommunication *com) :
280 PLATFORM::CThread(),
281 m_com(com),
282 m_iNextMessage(0)
283 {
284 m_incomingAdapterMessage = new CCECAdapterMessage;
285 m_currentCECFrame.Clear();
286 }
287
288 CCECAdapterMessageQueue::~CCECAdapterMessageQueue(void)
289 {
290 Clear();
291 StopThread(0);
292 delete m_incomingAdapterMessage;
293 }
294
295 void CCECAdapterMessageQueue::Clear(void)
296 {
297 StopThread(5);
298 CLockObject lock(m_mutex);
299 m_writeQueue.Clear();
300 m_messages.clear();
301 }
302
303 void *CCECAdapterMessageQueue::Process(void)
304 {
305 CCECAdapterMessageQueueEntry *message(NULL);
306 while (!IsStopped())
307 {
308 /* wait for a new message */
309 if (m_writeQueue.Pop(message, MESSAGE_QUEUE_SIGNAL_WAIT_TIME) && message)
310 {
311 /* write this message */
312 {
313 CLockObject lock(m_mutex);
314 m_com->WriteToDevice(message->m_message);
315 }
316 if (message->m_message->state == ADAPTER_MESSAGE_STATE_ERROR ||
317 message->m_message->Message() == MSGCODE_START_BOOTLOADER)
318 {
319 message->Signal();
320 Clear();
321 break;
322 }
323 }
324
325 CheckTimedOutMessages();
326 }
327 return NULL;
328 }
329
330 void CCECAdapterMessageQueue::CheckTimedOutMessages(void)
331 {
332 CLockObject lock(m_mutex);
333 vector<uint64_t> timedOut;
334 for (map<uint64_t, CCECAdapterMessageQueueEntry *>::iterator it = m_messages.begin(); it != m_messages.end(); it++)
335 {
336 if (it->second->TimedOutOrSucceeded())
337 {
338 timedOut.push_back(it->first);
339 if (!it->second->m_bSucceeded)
340 {
341 if (it->second->m_message->IsTranmission())
342 m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "transmission '%s' was not acked by the controller", it->second->m_message->ToString().c_str());
343 else
344 m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(it->second->m_message->Message()));
345 }
346 delete it->second->m_message;
347 delete it->second;
348 }
349 }
350
351 for (vector<uint64_t>::iterator it = timedOut.begin(); it != timedOut.end(); it++)
352 {
353 uint64_t iEntryId = *it;
354 m_messages.erase(iEntryId);
355 }
356 }
357
358 void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg)
359 {
360 bool bHandled(false);
361 CLockObject lock(m_mutex);
362 /* send the received message to each entry in the queue until it is handled */
363 for (map<uint64_t, CCECAdapterMessageQueueEntry *>::iterator it = m_messages.begin(); !bHandled && it != m_messages.end(); it++)
364 bHandled = it->second->MessageReceived(msg);
365
366 if (!bHandled)
367 {
368 /* the message wasn't handled */
369 bool bIsError(m_com->HandlePoll(msg));
370 m_com->m_callback->GetLib()->AddLog(bIsError ? CEC_LOG_WARNING : CEC_LOG_DEBUG, msg.ToString().c_str());
371
372 /* push this message to the current frame */
373 if (!bIsError && msg.PushToCecCommand(m_currentCECFrame))
374 {
375 /* and push the current frame back over the callback method when a full command was received */
376 if (m_com->IsInitialised())
377 m_com->m_callback->OnCommandReceived(m_currentCECFrame);
378
379 /* clear the current frame */
380 m_currentCECFrame.Clear();
381 }
382 }
383 }
384
385 void CCECAdapterMessageQueue::AddData(uint8_t *data, size_t iLen)
386 {
387 for (size_t iPtr = 0; iPtr < iLen; iPtr++)
388 {
389 bool bFullMessage(false);
390 {
391 CLockObject lock(m_mutex);
392 bFullMessage = m_incomingAdapterMessage->PushReceivedByte(data[iPtr]);
393 }
394
395 if (bFullMessage)
396 {
397 /* a full message was received */
398 CCECAdapterMessage newMessage;
399 newMessage.packet = m_incomingAdapterMessage->packet;
400 MessageReceived(newMessage);
401
402 /* clear the current message */
403 CLockObject lock(m_mutex);
404 m_incomingAdapterMessage->Clear();
405 }
406 }
407 }
408
409 bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg)
410 {
411 msg->state = ADAPTER_MESSAGE_STATE_WAITING_TO_BE_SENT;
412
413 /* set the correct line timeout */
414 if (msg->IsTranmission())
415 {
416 m_com->SetLineTimeout(msg->lineTimeout);
417 }
418
419 CCECAdapterMessageQueueEntry *entry = new CCECAdapterMessageQueueEntry(this, msg);
420 if (!entry)
421 {
422 m_com->m_callback->GetLib()->AddLog(CEC_LOG_ERROR, "couldn't create queue entry for '%s'", CCECAdapterMessage::ToString(msg->Message()));
423 msg->state = ADAPTER_MESSAGE_STATE_ERROR;
424 return false;
425 }
426
427 uint64_t iEntryId(0);
428 /* add to the wait for ack queue */
429 if (msg->Message() != MSGCODE_START_BOOTLOADER)
430 {
431 CLockObject lock(m_mutex);
432 iEntryId = m_iNextMessage++;
433 m_messages.insert(make_pair(iEntryId, entry));
434 }
435
436 /* add the message to the write queue */
437 m_writeQueue.Push(entry);
438
439 bool bReturn(true);
440 if (!msg->bFireAndForget)
441 {
442 if (!entry->Wait(msg->transmit_timeout <= 5 ? CEC_DEFAULT_TRANSMIT_WAIT : msg->transmit_timeout))
443 {
444 if (msg->IsTranmission())
445 m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "transmission '%s' was not acked by the controller", msg->ToString().c_str());
446 else
447 m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(msg->Message()));
448 msg->state = ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
449 bReturn = false;
450 }
451
452 if (msg->Message() != MSGCODE_START_BOOTLOADER)
453 {
454 CLockObject lock(m_mutex);
455 m_messages.erase(iEntryId);
456 }
457
458 if (msg->ReplyIsError())
459 msg->state = ADAPTER_MESSAGE_STATE_ERROR;
460
461 delete entry;
462 }
463
464 return bReturn;
465 }
466
467 bool CCECAdapterMessageQueue::ProvidesExtendedResponse(void)
468 {
469 return m_com && m_com->ProvidesExtendedResponse();
470 }