From: Lars Op den Kamp Date: Sun, 15 Apr 2012 19:19:49 +0000 (+0200) Subject: cec: fixed - transmissions can come in while waiting for a response to a command X-Git-Tag: upstream/2.2.0~1^2~31^2~5^2~1 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=8cdaa0596e85ac4d3997be1e1f0d142e5b1c38ee;p=deb_libcec.git cec: fixed - transmissions can come in while waiting for a response to a command --- diff --git a/src/lib/adapter/USBCECAdapterMessageQueue.cpp b/src/lib/adapter/USBCECAdapterMessageQueue.cpp index e95a8a7..9c66051 100644 --- a/src/lib/adapter/USBCECAdapterMessageQueue.cpp +++ b/src/lib/adapter/USBCECAdapterMessageQueue.cpp @@ -37,6 +37,7 @@ using namespace CEC; using namespace PLATFORM; +using namespace std; CCECAdapterMessageQueueEntry::CCECAdapterMessageQueueEntry(CCECAdapterMessage *message) : m_message(message), @@ -54,51 +55,34 @@ void CCECAdapterMessageQueueEntry::Broadcast(void) bool CCECAdapterMessageQueueEntry::MessageReceived(const CCECAdapterMessage &message) { - bool bSendSignal(false); bool bHandled(false); - PLATFORM::CLockObject lock(m_mutex); - if (!IsResponse(message)) + if (IsResponse(message)) { - /* we received a message from the adapter that's not a response to this command */ - if (!message.IsTranmission()) - { - /* we received something that's not a transmission while waiting for an ack to this command, so this command failed */ - - //TODO verify whether we're not failing too soon - CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - %s - not a response %s - failed", __FUNCTION__, ToString(), CCECAdapterMessage::ToString(message.Message())); - m_message->state = ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED; - bSendSignal = true; - } - } - else - { - /* we received a response, so this message is handled */ - bHandled = true; switch (message.Message()) { case MSGCODE_COMMAND_ACCEPTED: - bSendSignal = MessageReceivedCommandAccepted(message); + bHandled = MessageReceivedCommandAccepted(message); break; case MSGCODE_TRANSMIT_SUCCEEDED: - bSendSignal = MessageReceivedTransmitSucceeded(message); + bHandled = MessageReceivedTransmitSucceeded(message); break; default: - bSendSignal = MessageReceivedResponse(message); + bHandled = MessageReceivedResponse(message); break; } } - /* signal the waiting thread when we're done */ - if (bSendSignal) - { - m_bSucceeded = true; - m_condition.Signal(); - } - return bHandled; } +void CCECAdapterMessageQueueEntry::Signal(void) +{ + CLockObject lock(m_mutex); + m_bSucceeded = true; + m_condition.Signal(); +} + bool CCECAdapterMessageQueueEntry::Wait(uint32_t iTimeout) { bool bReturn(false); @@ -151,63 +135,77 @@ const char *CCECAdapterMessageQueueEntry::ToString(void) const bool CCECAdapterMessageQueueEntry::MessageReceivedCommandAccepted(const CCECAdapterMessage &message) { bool bSendSignal(false); - if (m_iPacketsLeft == 0) - { - /* we received a "command accepted", but we're not waiting for one anymore */ - CLibCEC::AddLog(CEC_LOG_ERROR, "%s - received unexpected 'command accepted' message", ToString()); - m_message->state = ADAPTER_MESSAGE_STATE_ERROR; - bSendSignal = true; - } - else + bool bHandled(false); { - /* decrease number of acks we're waiting on by 1 */ + CLockObject lock(m_mutex); if (m_iPacketsLeft > 0) + { + /* decrease by 1 */ m_iPacketsLeft--; - /* log this message */ - CStdString strLog; - strLog.Format("%s - command accepted", ToString()); - if (m_iPacketsLeft > 0) - strLog.AppendFormat(" - waiting for %d more", m_iPacketsLeft); - CLibCEC::AddLog(CEC_LOG_DEBUG, strLog); - - /* no more packets left and not a transmission, so we're done */ - if (!m_message->IsTranmission() && m_iPacketsLeft == 0) - { - m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED; - m_message->response = message.packet; - bSendSignal = true; + /* log this message */ + CStdString strLog; + strLog.Format("%s - command accepted", ToString()); + if (m_iPacketsLeft > 0) + strLog.AppendFormat(" - waiting for %d more", m_iPacketsLeft); + CLibCEC::AddLog(CEC_LOG_DEBUG, strLog); + + /* no more packets left and not a transmission, so we're done */ + if (!m_message->IsTranmission() && m_iPacketsLeft == 0) + { + m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED; + m_message->response = message.packet; + bSendSignal = true; + } + bHandled = true; } } - return bSendSignal; + + if (bSendSignal) + Signal(); + + return bHandled; } bool CCECAdapterMessageQueueEntry::MessageReceivedTransmitSucceeded(const CCECAdapterMessage &message) { - if (m_iPacketsLeft == 0) - { - /* transmission succeeded, so we're done */ - CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - transmit succeeded", ToString()); - m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED; - m_message->response = message.packet; - } - else { - /* error, we expected more acks */ - CLibCEC::AddLog(CEC_LOG_WARNING, "%s - received 'transmit succeeded' but not enough 'command accepted' messages (%d left)", ToString(), m_iPacketsLeft); - m_message->state = ADAPTER_MESSAGE_STATE_ERROR; + CLockObject lock(m_mutex); + if (m_iPacketsLeft == 0) + { + /* transmission succeeded, so we're done */ + CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - transmit succeeded", ToString()); + m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED; + m_message->response = message.packet; + } + else + { + /* error, we expected more acks + since the messages are processed in order, this should not happen, so this is an error situation */ + CLibCEC::AddLog(CEC_LOG_WARNING, "%s - received 'transmit succeeded' but not enough 'command accepted' messages (%d left)", ToString(), m_iPacketsLeft); + m_message->state = ADAPTER_MESSAGE_STATE_ERROR; + } } + + Signal(); + return true; } bool CCECAdapterMessageQueueEntry::MessageReceivedResponse(const CCECAdapterMessage &message) { - CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - received response", ToString()); - m_message->response = message.packet; - if (m_message->IsTranmission()) - m_message->state = message.Message() == MSGCODE_TRANSMIT_SUCCEEDED ? ADAPTER_MESSAGE_STATE_SENT_ACKED : ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED; - else - m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED; + { + CLockObject lock(m_mutex); + CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - received response", ToString()); + m_message->response = message.packet; + if (m_message->IsTranmission()) + m_message->state = message.Message() == MSGCODE_TRANSMIT_SUCCEEDED ? ADAPTER_MESSAGE_STATE_SENT_ACKED : ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED; + else + m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED; + } + + Signal(); + return true; } @@ -220,20 +218,18 @@ CCECAdapterMessageQueue::~CCECAdapterMessageQueue(void) void CCECAdapterMessageQueue::Clear(void) { CLockObject lock(m_mutex); - CCECAdapterMessageQueueEntry *message(NULL); - while (m_messages.Pop(message)) - message->Broadcast(); + m_messages.clear(); } void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg) { + bool bHandled(false); CLockObject lock(m_mutex); - CCECAdapterMessageQueueEntry *message = GetNextQueuedEntry(); + /* send the received message to each entry in the queue until it is handled */ + for (map::iterator it = m_messages.begin(); !bHandled && it != m_messages.end(); it++) + bHandled = it->second->MessageReceived(msg); - /* send the received message to the first entry in the queue */ - bool bHandled = message ? message->MessageReceived(msg) : false; - - if (!message || !bHandled) + if (!bHandled) { /* the message wasn't handled */ bool bIsError(m_com->HandlePoll(msg)); @@ -290,12 +286,14 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg) } CCECAdapterMessageQueueEntry *entry(NULL); + uint64_t iEntryId(0); /* add to the wait for ack queue */ if (msg->Message() != MSGCODE_START_BOOTLOADER) { + CLockObject lock(m_mutex); entry = new CCECAdapterMessageQueueEntry(msg); - PLATFORM::CLockObject lock(m_mutex); - m_messages.Push(entry); + iEntryId = m_iNextMessage++; + m_messages.insert(make_pair(iEntryId, entry)); } /* TODO write the message async */ @@ -306,28 +304,19 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg) return false; } - if (entry && !entry->Wait(msg->transmit_timeout <= 5 ? CEC_DEFAULT_TRANSMIT_WAIT : msg->transmit_timeout)) + bool bReturn(true); + if (entry) { - CLibCEC::AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(msg->Message())); - msg->state = ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED; - return false; - } - - return true; -} - -CCECAdapterMessageQueueEntry *CCECAdapterMessageQueue::GetNextQueuedEntry(void) -{ - CCECAdapterMessageQueueEntry *message(NULL); - while (message == NULL && m_messages.Peek(message)) - { - if (!message->IsWaiting()) + if (!entry->Wait(msg->transmit_timeout <= 5 ? CEC_DEFAULT_TRANSMIT_WAIT : msg->transmit_timeout)) { - /* delete old messages */ - m_messages.Pop(message); - delete message; - message = NULL; + CLibCEC::AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(msg->Message())); + msg->state = ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED; + bReturn = false; } + + CLockObject lock(m_mutex); + m_messages.erase(iEntryId); } - return message; + + return bReturn; } diff --git a/src/lib/adapter/USBCECAdapterMessageQueue.h b/src/lib/adapter/USBCECAdapterMessageQueue.h index 5163090..9c5be49 100644 --- a/src/lib/adapter/USBCECAdapterMessageQueue.h +++ b/src/lib/adapter/USBCECAdapterMessageQueue.h @@ -32,6 +32,7 @@ */ #include "USBCECAdapterMessage.h" +#include namespace CEC { @@ -88,24 +89,29 @@ namespace CEC /*! * @brief Called when a 'command accepted' message was received. * @param message The message that was received. - * @return True when the waiting thread need to be signaled, false otherwise. + * @return True when the message was handled, false otherwise. */ bool MessageReceivedCommandAccepted(const CCECAdapterMessage &message); /*! * @brief Called when a 'transmit succeeded' message was received. * @param message The message that was received. - * @return True when the waiting thread need to be signaled, false otherwise. + * @return True when the message was handled, false otherwise. */ bool MessageReceivedTransmitSucceeded(const CCECAdapterMessage &message); /*! * @brief Called when a message that's not a 'command accepted' or 'transmit succeeded' message was received. * @param message The message that was received. - * @return True when the waiting thread need to be signaled, false otherwise. + * @return True when the message was handled, false otherwise. */ bool MessageReceivedResponse(const CCECAdapterMessage &message); + /*! + * @brief Signals the waiting thread. + */ + void Signal(void); + CCECAdapterMessage * m_message; /**< the message that was sent */ uint8_t m_iPacketsLeft; /**< the amount of acks that we're waiting on */ bool m_bSucceeded; /**< true when the command received a response, false otherwise */ @@ -124,9 +130,9 @@ namespace CEC * @param com The communication handler callback to use. * @param iQueueSize The outgoing message queue size. */ - CCECAdapterMessageQueue(CUSBCECAdapterCommunication *com, size_t iQueueSize = 64) : + CCECAdapterMessageQueue(CUSBCECAdapterCommunication *com) : m_com(com), - m_messages(iQueueSize) {} + m_iNextMessage(0) {} virtual ~CCECAdapterMessageQueue(void); /*! @@ -155,14 +161,10 @@ namespace CEC bool Write(CCECAdapterMessage *msg); private: - /*! - * @return The next message in the queue, or NULL if there is none. - */ - CCECAdapterMessageQueueEntry *GetNextQueuedEntry(void); - CUSBCECAdapterCommunication * m_com; /**< the communication handler */ PLATFORM::CMutex m_mutex; /**< mutex for changes to this class */ - PLATFORM::SyncedBuffer m_messages; /**< the outgoing message queue */ + std::map m_messages; /**< the outgoing message queue */ + uint64_t m_iNextMessage; /**< the index of the next message */ CCECAdapterMessage m_incomingAdapterMessage; /**< the current incoming message that's being assembled */ cec_command m_currentCECFrame; /**< the current incoming CEC command that's being assembled */ };