fixed - don't wait for a response when sending a reply, so we don't block message...
authorLars Op den Kamp <lars@opdenkamp.eu>
Wed, 3 Oct 2012 13:11:20 +0000 (15:11 +0200)
committerLars Op den Kamp <lars@opdenkamp.eu>
Wed, 3 Oct 2012 14:25:01 +0000 (16:25 +0200)
src/lib/CECProcessor.cpp
src/lib/adapter/Pulse-Eight/USBCECAdapterCommands.cpp
src/lib/adapter/Pulse-Eight/USBCECAdapterCommunication.cpp
src/lib/adapter/Pulse-Eight/USBCECAdapterMessage.cpp
src/lib/adapter/Pulse-Eight/USBCECAdapterMessage.h
src/lib/adapter/Pulse-Eight/USBCECAdapterMessageQueue.cpp
src/lib/adapter/Pulse-Eight/USBCECAdapterMessageQueue.h

index 9d496404d30a338b61a9acbb43ef70f3c2b34c06..409b30b1cdde68d6f4ff79304a9fecc32a7d1316 100644 (file)
@@ -437,7 +437,9 @@ bool CCECProcessor::Transmit(const cec_command &data, bool bIsReply)
     iLineTimeout = m_iRetryLineTimeout;
   }
 
-  return adapterState == ADAPTER_MESSAGE_STATE_SENT_ACKED;
+  return bIsReply ?
+      adapterState == ADAPTER_MESSAGE_STATE_SENT_ACKED || adapterState == ADAPTER_MESSAGE_STATE_SENT || adapterState == ADAPTER_MESSAGE_STATE_WAITING_TO_BE_SENT :
+      adapterState == ADAPTER_MESSAGE_STATE_SENT_ACKED;
 }
 
 void CCECProcessor::TransmitAbort(cec_logical_address source, cec_logical_address destination, cec_opcode opcode, cec_abort_reason reason /* = CEC_ABORT_REASON_UNRECOGNIZED_OPCODE */)
index ab0fa392e12633c54a0c237861bbae481b1fbfcb..cf97ea9f3593e7ab01320f25b7a2deff6811616e 100644 (file)
@@ -91,7 +91,7 @@ uint16_t CUSBCECAdapterCommands::RequestFirmwareVersion(void)
       m_persistedConfiguration.iFirmwareVersion = (response[0] << 8 | response[1]);
     else
     {
-      LIB_CEC->AddLog(CEC_LOG_WARNING, "the adapter did not respond with a correct firmware version (try %d)", iFwVersionTry);
+      LIB_CEC->AddLog(CEC_LOG_WARNING, "the adapter did not respond with a correct firmware version (try %d, size = %d)", iFwVersionTry, response.size);
       CEvent::Sleep(500);
     }
   }
index ae4c8ac60ebdc210e7706af77d55fac74dcf0bd5..a2c33e789fcac8b3793c2a2b2ac60a11ec84fe1c 100644 (file)
@@ -234,24 +234,33 @@ void CUSBCECAdapterCommunication::Close(void)
     m_port->Close();
 }
 
-cec_adapter_message_state CUSBCECAdapterCommunication::Write(const cec_command &data, bool &bRetry, uint8_t iLineTimeout, bool UNUSED(bIsReply))
+cec_adapter_message_state CUSBCECAdapterCommunication::Write(const cec_command &data, bool &bRetry, uint8_t iLineTimeout, bool bIsReply)
 {
   cec_adapter_message_state retVal(ADAPTER_MESSAGE_STATE_UNKNOWN);
   if (!IsRunning())
     return retVal;
 
   CCECAdapterMessage *output = new CCECAdapterMessage(data, iLineTimeout);
+  output->bFireAndForget = bIsReply;
 
   /* mark as waiting for an ack from the destination */
   MarkAsWaiting(data.destination);
 
   /* send the message */
-  bRetry = (!m_adapterMessageQueue->Write(output) || output->NeedsRetry()) && output->transmit_timeout > 0;
-  if (bRetry)
-    Sleep(CEC_DEFAULT_TRANSMIT_RETRY_WAIT);
-  retVal = output->state;
+  if (bIsReply)
+  {
+    retVal = m_adapterMessageQueue->Write(output) ?
+        ADAPTER_MESSAGE_STATE_WAITING_TO_BE_SENT : ADAPTER_MESSAGE_STATE_ERROR;
+  }
+  else
+  {
+    bRetry = (!m_adapterMessageQueue->Write(output) || output->NeedsRetry()) && output->transmit_timeout > 0;
+    if (bRetry)
+      Sleep(CEC_DEFAULT_TRANSMIT_RETRY_WAIT);
+    retVal = output->state;
 
-  delete output;
+    delete output;
+  }
   return retVal;
 }
 
index a5be4120e5912ea6ab977c75d0669213ffeab5c9..f5d07359f28d8f32c97567384c71748d28809522 100644 (file)
@@ -266,6 +266,7 @@ void CCECAdapterMessage::Clear(void)
   packet.Clear();
   lineTimeout         = 3;
   bNextByteIsEscaped  = false;
+  bFireAndForget      = false;
 }
 
 void CCECAdapterMessage::Shift(uint8_t iShiftBy)
index 7c8020de8a0a2ae378b34e3ef1b88bc590a50146..89005e8694ff54419e7a93ddddc199dc5a9b50bc 100644 (file)
@@ -255,6 +255,7 @@ namespace CEC
     cec_adapter_message_state             state;                /**< the current state of this message */
     int32_t                               transmit_timeout;     /**< the timeout to use when sending this message */
     uint8_t                               lineTimeout;          /**< the default CEC line timeout to use when sending this message */
+    bool                                  bFireAndForget;       /**< true to auto delete, don't wait for a response */
 
   private:
     bool                                  bNextByteIsEscaped;   /**< true when the next byte that is added will be escaped, false otherwise */
index 4e035007e6423543f39e5158117b4e70b0b7a419..a75195270fece7715618244f0e39f18c7ceb8aeb 100644 (file)
@@ -50,7 +50,8 @@ CCECAdapterMessageQueueEntry::CCECAdapterMessageQueueEntry(CCECAdapterMessageQue
     m_message(message),
     m_iPacketsLeft(message->IsTranmission() ? message->Size() / 4 : 1),
     m_bSucceeded(false),
-    m_bWaiting(true) {}
+    m_bWaiting(true),
+    m_queueTimeout(message->transmit_timeout) {}
 
 CCECAdapterMessageQueueEntry::~CCECAdapterMessageQueueEntry(void) { }
 
@@ -270,6 +271,11 @@ bool CCECAdapterMessageQueueEntry::ProvidesExtendedResponse(void)
   return m_queue && m_queue->ProvidesExtendedResponse();
 }
 
+bool CCECAdapterMessageQueueEntry::TimedOutOrSucceeded(void) const
+{
+  return m_message->bFireAndForget && (m_bSucceeded || m_queueTimeout.TimeLeft() == 0);
+}
+
 CCECAdapterMessageQueue::CCECAdapterMessageQueue(CUSBCECAdapterCommunication *com) :
   PLATFORM::CThread(),
   m_com(com),
@@ -315,10 +321,35 @@ void *CCECAdapterMessageQueue::Process(void)
         break;
       }
     }
+
+    CheckTimedOutMessages();
   }
   return NULL;
 }
 
+void CCECAdapterMessageQueue::CheckTimedOutMessages(void)
+{
+  CLockObject lock(m_mutex);
+  vector<uint64_t> timedOut;
+  for (map<uint64_t, CCECAdapterMessageQueueEntry *>::iterator it = m_messages.begin(); it != m_messages.end(); it++)
+  {
+    if (it->second->TimedOutOrSucceeded())
+    {
+      timedOut.push_back(it->first);
+      if (!it->second->m_bSucceeded)
+        m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(it->second->m_message->Message()));
+      delete it->second->m_message;
+      delete it->second;
+    }
+  }
+
+  for (vector<uint64_t>::iterator it = timedOut.begin(); it != timedOut.end(); it++)
+  {
+    uint64_t iEntryId = *it;
+    m_messages.erase(iEntryId);
+  }
+}
+
 void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg)
 {
   bool bHandled(false);
@@ -381,6 +412,13 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg)
   }
 
   CCECAdapterMessageQueueEntry *entry = new CCECAdapterMessageQueueEntry(this, msg);
+  if (!entry)
+  {
+    m_com->m_callback->GetLib()->AddLog(CEC_LOG_ERROR, "couldn't create queue entry for '%s'", CCECAdapterMessage::ToString(msg->Message()));
+    msg->state = ADAPTER_MESSAGE_STATE_ERROR;
+    return false;
+  }
+
   uint64_t iEntryId(0);
   /* add to the wait for ack queue */
   if (msg->Message() != MSGCODE_START_BOOTLOADER)
@@ -394,7 +432,7 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg)
   m_writeQueue.Push(entry);
 
   bool bReturn(true);
-  if (entry)
+  if (!msg->bFireAndForget)
   {
     if (!entry->Wait(msg->transmit_timeout <= 5 ? CEC_DEFAULT_TRANSMIT_WAIT : msg->transmit_timeout))
     {
index b81c908fafb6bf2b4ed5a7ba84df0b1b90441ffb..7e01436372250ec30ae63c7911f87343e6bdc82b 100644 (file)
@@ -33,6 +33,7 @@
 
 #include "lib/platform/threads/threads.h"
 #include "lib/platform/util/buffer.h"
+#include "lib/platform/util/timeutils.h"
 #include <map>
 #include "USBCECAdapterMessage.h"
 
@@ -118,6 +119,11 @@ namespace CEC
 
     bool ProvidesExtendedResponse(void);
 
+    /*!
+     * @return True when a fire and forget packet timed out or succeeded, false otherwise
+     */
+    bool TimedOutOrSucceeded(void) const;
+
     CCECAdapterMessageQueue *  m_queue;
     CCECAdapterMessage *       m_message;      /**< the message that was sent */
     uint8_t                    m_iPacketsLeft; /**< the amount of acks that we're waiting on */
@@ -125,6 +131,7 @@ namespace CEC
     bool                       m_bWaiting;     /**< true while a thread is waiting or when it hasn't started waiting yet */
     PLATFORM::CCondition<bool> m_condition;    /**< the condition to wait on */
     PLATFORM::CMutex           m_mutex;        /**< mutex for changes to this class */
+    PLATFORM::CTimeout         m_queueTimeout;   /**< ack timeout for fire and forget commands */
   };
 
   class CCECAdapterMessageQueue : public PLATFORM::CThread
@@ -170,6 +177,8 @@ namespace CEC
 
     virtual void *Process(void);
 
+    void CheckTimedOutMessages(void);
+
   private:
     CUSBCECAdapterCommunication *                          m_com;                    /**< the communication handler */
     PLATFORM::CMutex                                       m_mutex;                  /**< mutex for changes to this class */