cec: fixed - transmissions can come in while waiting for a response to a command
authorLars Op den Kamp <lars@opdenkamp.eu>
Sun, 15 Apr 2012 19:19:49 +0000 (21:19 +0200)
committerLars Op den Kamp <lars@opdenkamp.eu>
Sun, 15 Apr 2012 19:19:49 +0000 (21:19 +0200)
src/lib/adapter/USBCECAdapterMessageQueue.cpp
src/lib/adapter/USBCECAdapterMessageQueue.h

index e95a8a75921537889cd9d0c035fe2e775ad3f65c..9c66051f5e368640e27bc128e5185b55839dd77a 100644 (file)
@@ -37,6 +37,7 @@
 
 using namespace CEC;
 using namespace PLATFORM;
+using namespace std;
 
 CCECAdapterMessageQueueEntry::CCECAdapterMessageQueueEntry(CCECAdapterMessage *message) :
     m_message(message),
@@ -54,51 +55,34 @@ void CCECAdapterMessageQueueEntry::Broadcast(void)
 
 bool CCECAdapterMessageQueueEntry::MessageReceived(const CCECAdapterMessage &message)
 {
-  bool bSendSignal(false);
   bool bHandled(false);
 
-  PLATFORM::CLockObject lock(m_mutex);
-  if (!IsResponse(message))
+  if (IsResponse(message))
   {
-    /* we received a message from the adapter that's not a response to this command */
-    if (!message.IsTranmission())
-    {
-      /* we received something that's not a transmission while waiting for an ack to this command, so this command failed */
-
-      //TODO verify whether we're not failing too soon
-      CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - %s - not a response %s - failed", __FUNCTION__, ToString(), CCECAdapterMessage::ToString(message.Message()));
-      m_message->state = ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
-      bSendSignal = true;
-    }
-  }
-  else
-  {
-    /* we received a response, so this message is handled */
-    bHandled = true;
     switch (message.Message())
     {
     case MSGCODE_COMMAND_ACCEPTED:
-      bSendSignal = MessageReceivedCommandAccepted(message);
+      bHandled = MessageReceivedCommandAccepted(message);
       break;
     case MSGCODE_TRANSMIT_SUCCEEDED:
-      bSendSignal = MessageReceivedTransmitSucceeded(message);
+      bHandled = MessageReceivedTransmitSucceeded(message);
       break;
     default:
-      bSendSignal = MessageReceivedResponse(message);
+      bHandled = MessageReceivedResponse(message);
       break;
     }
   }
 
-  /* signal the waiting thread when we're done */
-  if (bSendSignal)
-  {
-    m_bSucceeded = true;
-    m_condition.Signal();
-  }
-
   return bHandled;
 }
 
+void CCECAdapterMessageQueueEntry::Signal(void)
+{
+  CLockObject lock(m_mutex);
+  m_bSucceeded = true;
+  m_condition.Signal();
+}
+
 bool CCECAdapterMessageQueueEntry::Wait(uint32_t iTimeout)
 {
   bool bReturn(false);
@@ -151,63 +135,77 @@ const char *CCECAdapterMessageQueueEntry::ToString(void) const
 bool CCECAdapterMessageQueueEntry::MessageReceivedCommandAccepted(const CCECAdapterMessage &message)
 {
   bool bSendSignal(false);
-  if (m_iPacketsLeft == 0)
-  {
-    /* we received a "command accepted", but we're not waiting for one anymore */
-    CLibCEC::AddLog(CEC_LOG_ERROR, "%s - received unexpected 'command accepted' message", ToString());
-    m_message->state = ADAPTER_MESSAGE_STATE_ERROR;
-    bSendSignal = true;
-  }
-  else
+  bool bHandled(false);
   {
-    /* decrease number of acks we're waiting on by 1 */
+    CLockObject lock(m_mutex);
     if (m_iPacketsLeft > 0)
+    {
+      /* decrease by 1 */
       m_iPacketsLeft--;
 
-    /* log this message */
-    CStdString strLog;
-    strLog.Format("%s - command accepted", ToString());
-    if (m_iPacketsLeft > 0)
-      strLog.AppendFormat(" - waiting for %d more", m_iPacketsLeft);
-    CLibCEC::AddLog(CEC_LOG_DEBUG, strLog);
-
-    /* no more packets left and not a transmission, so we're done */
-    if (!m_message->IsTranmission() && m_iPacketsLeft == 0)
-    {
-      m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
-      m_message->response = message.packet;
-      bSendSignal = true;
+      /* log this message */
+      CStdString strLog;
+      strLog.Format("%s - command accepted", ToString());
+      if (m_iPacketsLeft > 0)
+        strLog.AppendFormat(" - waiting for %d more", m_iPacketsLeft);
+      CLibCEC::AddLog(CEC_LOG_DEBUG, strLog);
+
+      /* no more packets left and not a transmission, so we're done */
+      if (!m_message->IsTranmission() && m_iPacketsLeft == 0)
+      {
+        m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
+        m_message->response = message.packet;
+        bSendSignal = true;
+      }
+      bHandled = true;
     }
   }
-  return bSendSignal;
+
+  if (bSendSignal)
+    Signal();
+
+  return bHandled;
 }
 
 bool CCECAdapterMessageQueueEntry::MessageReceivedTransmitSucceeded(const CCECAdapterMessage &message)
 {
-  if (m_iPacketsLeft == 0)
-  {
-    /* transmission succeeded, so we're done */
-    CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - transmit succeeded", ToString());
-    m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
-    m_message->response = message.packet;
-  }
-  else
   {
-    /* error, we expected more acks */
-    CLibCEC::AddLog(CEC_LOG_WARNING, "%s - received 'transmit succeeded' but not enough 'command accepted' messages (%d left)", ToString(), m_iPacketsLeft);
-    m_message->state = ADAPTER_MESSAGE_STATE_ERROR;
+    CLockObject lock(m_mutex);
+    if (m_iPacketsLeft == 0)
+    {
+      /* transmission succeeded, so we're done */
+      CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - transmit succeeded", ToString());
+      m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
+      m_message->response = message.packet;
+    }
+    else
+    {
+      /* error, we expected more acks
+         since the messages are processed in order, this should not happen, so this is an error situation */
+      CLibCEC::AddLog(CEC_LOG_WARNING, "%s - received 'transmit succeeded' but not enough 'command accepted' messages (%d left)", ToString(), m_iPacketsLeft);
+      m_message->state = ADAPTER_MESSAGE_STATE_ERROR;
+    }
   }
+
+  Signal();
+
   return true;
 }
 
 bool CCECAdapterMessageQueueEntry::MessageReceivedResponse(const CCECAdapterMessage &message)
 {
-  CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - received response", ToString());
-  m_message->response = message.packet;
-  if (m_message->IsTranmission())
-    m_message->state = message.Message() == MSGCODE_TRANSMIT_SUCCEEDED ? ADAPTER_MESSAGE_STATE_SENT_ACKED : ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
-  else
-    m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
+  {
+    CLockObject lock(m_mutex);
+    CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - received response", ToString());
+    m_message->response = message.packet;
+    if (m_message->IsTranmission())
+      m_message->state = message.Message() == MSGCODE_TRANSMIT_SUCCEEDED ? ADAPTER_MESSAGE_STATE_SENT_ACKED : ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
+    else
+      m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
+  }
+
+  Signal();
+
   return true;
 }
 
@@ -220,20 +218,18 @@ CCECAdapterMessageQueue::~CCECAdapterMessageQueue(void)
 void CCECAdapterMessageQueue::Clear(void)
 {
   CLockObject lock(m_mutex);
-  CCECAdapterMessageQueueEntry *message(NULL);
-  while (m_messages.Pop(message))
-    message->Broadcast();
+  m_messages.clear();
 }
 
 void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg)
 {
+  bool bHandled(false);
   CLockObject lock(m_mutex);
-  CCECAdapterMessageQueueEntry *message = GetNextQueuedEntry();
+  /* send the received message to each entry in the queue until it is handled */
+  for (map<uint64_t, CCECAdapterMessageQueueEntry *>::iterator it = m_messages.begin(); !bHandled && it != m_messages.end(); it++)
+    bHandled = it->second->MessageReceived(msg);
 
-  /* send the received message to the first entry in the queue */
-  bool bHandled = message ? message->MessageReceived(msg) : false;
-
-  if (!message || !bHandled)
+  if (!bHandled)
   {
     /* the message wasn't handled */
     bool bIsError(m_com->HandlePoll(msg));
@@ -290,12 +286,14 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg)
   }
 
   CCECAdapterMessageQueueEntry *entry(NULL);
+  uint64_t iEntryId(0);
   /* add to the wait for ack queue */
   if (msg->Message() != MSGCODE_START_BOOTLOADER)
   {
+    CLockObject lock(m_mutex);
     entry = new CCECAdapterMessageQueueEntry(msg);
-    PLATFORM::CLockObject lock(m_mutex);
-    m_messages.Push(entry);
+    iEntryId = m_iNextMessage++;
+    m_messages.insert(make_pair(iEntryId, entry));
   }
 
   /* TODO write the message async */
@@ -306,28 +304,19 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg)
     return false;
   }
 
-  if (entry && !entry->Wait(msg->transmit_timeout <= 5 ? CEC_DEFAULT_TRANSMIT_WAIT : msg->transmit_timeout))
+  bool bReturn(true);
+  if (entry)
   {
-    CLibCEC::AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(msg->Message()));
-    msg->state = ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
-    return false;
-  }
-
-  return true;
-}
-
-CCECAdapterMessageQueueEntry *CCECAdapterMessageQueue::GetNextQueuedEntry(void)
-{
-  CCECAdapterMessageQueueEntry *message(NULL);
-  while (message == NULL && m_messages.Peek(message))
-  {
-    if (!message->IsWaiting())
+    if (!entry->Wait(msg->transmit_timeout <= 5 ? CEC_DEFAULT_TRANSMIT_WAIT : msg->transmit_timeout))
     {
-      /* delete old messages */
-      m_messages.Pop(message);
-      delete message;
-      message = NULL;
+      CLibCEC::AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(msg->Message()));
+      msg->state = ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
+      bReturn = false;
     }
+
+    CLockObject lock(m_mutex);
+    m_messages.erase(iEntryId);
   }
-  return message;
+
+  return bReturn;
 }
index 5163090b6e33676aa231c8222ba5e0f7fd747569..9c5be4969b0abb084b65eaa09197d89fea880012 100644 (file)
@@ -32,6 +32,7 @@
  */
 
 #include "USBCECAdapterMessage.h"
+#include <map>
 
 namespace CEC
 {
@@ -88,24 +89,29 @@ namespace CEC
     /*!
      * @brief Called when a 'command accepted' message was received.
      * @param message The message that was received.
-     * @return True when the waiting thread need to be signaled, false otherwise.
+     * @return True when the message was handled, false otherwise.
      */
     bool MessageReceivedCommandAccepted(const CCECAdapterMessage &message);
 
     /*!
      * @brief Called when a 'transmit succeeded' message was received.
      * @param message The message that was received.
-     * @return True when the waiting thread need to be signaled, false otherwise.
+     * @return True when the message was handled, false otherwise.
      */
     bool MessageReceivedTransmitSucceeded(const CCECAdapterMessage &message);
 
     /*!
      * @brief Called when a message that's not a 'command accepted' or 'transmit succeeded' message was received.
      * @param message The message that was received.
-     * @return True when the waiting thread need to be signaled, false otherwise.
+     * @return True when the message was handled, false otherwise.
      */
     bool MessageReceivedResponse(const CCECAdapterMessage &message);
 
+    /*!
+     * @brief Signals the waiting thread.
+     */
+    void Signal(void);
+
     CCECAdapterMessage *       m_message;      /**< the message that was sent */
     uint8_t                    m_iPacketsLeft; /**< the amount of acks that we're waiting on */
     bool                       m_bSucceeded;   /**< true when the command received a response, false otherwise */
@@ -124,9 +130,9 @@ namespace CEC
      * @param com The communication handler callback to use.
      * @param iQueueSize The outgoing message queue size.
      */
-    CCECAdapterMessageQueue(CUSBCECAdapterCommunication *com, size_t iQueueSize = 64) :
+    CCECAdapterMessageQueue(CUSBCECAdapterCommunication *com) :
       m_com(com),
-      m_messages(iQueueSize) {}
+      m_iNextMessage(0) {}
     virtual ~CCECAdapterMessageQueue(void);
 
     /*!
@@ -155,14 +161,10 @@ namespace CEC
     bool Write(CCECAdapterMessage *msg);
 
   private:
-    /*!
-     * @return The next message in the queue, or NULL if there is none.
-     */
-    CCECAdapterMessageQueueEntry *GetNextQueuedEntry(void);
-
     CUSBCECAdapterCommunication *                          m_com;                    /**< the communication handler */
     PLATFORM::CMutex                                       m_mutex;                  /**< mutex for changes to this class */
-    PLATFORM::SyncedBuffer<CCECAdapterMessageQueueEntry *> m_messages;               /**< the outgoing message queue */
+    std::map<uint64_t, CCECAdapterMessageQueueEntry *>     m_messages;               /**< the outgoing message queue */
+    uint64_t                                               m_iNextMessage;           /**< the index of the next message */
     CCECAdapterMessage                                     m_incomingAdapterMessage; /**< the current incoming message that's being assembled */
     cec_command                                            m_currentCECFrame;        /**< the current incoming CEC command that's being assembled */
   };