cec: async writes for CUSBCECAdapterCommunication. less locks and polls, more speed
authorLars Op den Kamp <lars@opdenkamp.eu>
Tue, 24 Apr 2012 19:43:08 +0000 (21:43 +0200)
committerLars Op den Kamp <lars@opdenkamp.eu>
Tue, 24 Apr 2012 20:32:19 +0000 (22:32 +0200)
src/lib/LibCEC.cpp
src/lib/LibCEC.h
src/lib/adapter/USBCECAdapterCommunication.cpp
src/lib/adapter/USBCECAdapterMessageQueue.cpp
src/lib/adapter/USBCECAdapterMessageQueue.h
src/lib/platform/util/buffer.h

index 585091eac6ce6bfef0ee6ed86033ef8898b0c50f..a59fae711964c3e1b32fded66d0793470eb9d505 100644 (file)
@@ -354,7 +354,7 @@ void CLibCEC::AddLog(const cec_log_level level, const char *strFormat, ...)
   CLibCEC *instance = CLibCEC::GetInstance();
   if (!instance)
     return;
-  CLockObject lock(instance->m_mutex);
+  CLockObject lock(instance->m_logMutex);
 
   cec_log_message message;
   message.level = level;
index adfb50753150d465132dcac577fa7ac2d15f1d67..e45980075b51cdee4d6ce32b9e2a3acaa0bf367b 100644 (file)
@@ -154,5 +154,6 @@ namespace CEC
       ICECCallbacks *                         m_callbacks;
       void *                                  m_cbParam;
       PLATFORM::CMutex                        m_mutex;
+      PLATFORM::CMutex                        m_logMutex;
   };
 };
index b53114ceacd744b87302d450c06436eb8e877191..828ea24cac2aaecb4bf5a40e85df6b52691243b5 100644 (file)
@@ -92,7 +92,10 @@ bool CUSBCECAdapterCommunication::Open(uint32_t iTimeoutMs /* = 10000 */, bool b
       m_commands = new CUSBCECAdapterCommands(this);
 
     if (!m_adapterMessageQueue)
+    {
       m_adapterMessageQueue = new CCECAdapterMessageQueue(this);
+      m_adapterMessageQueue->CreateThread();
+    }
 
     /* try to open the connection */
     CStdString strError;
@@ -166,6 +169,8 @@ void CUSBCECAdapterCommunication::Close(void)
       SetControlledMode(false);
   }
 
+  m_adapterMessageQueue->Clear();
+
   /* stop and delete the ping thread */
   if (m_pingThread)
     m_pingThread->StopThread(0);
@@ -193,9 +198,7 @@ cec_adapter_message_state CUSBCECAdapterCommunication::Write(const cec_command &
 
   /* send the message */
   bRetry = (!m_adapterMessageQueue->Write(output) || output->NeedsRetry()) && output->transmit_timeout > 0;
-  if (output->state == ADAPTER_MESSAGE_STATE_ERROR)
-    Close();
-  else if (bRetry)
+  if (bRetry)
     Sleep(CEC_DEFAULT_TRANSMIT_RETRY_WAIT);
   retVal = output->state;
 
@@ -312,6 +315,7 @@ bool CUSBCECAdapterCommunication::WriteToDevice(CCECAdapterMessage *message)
   {
     CLibCEC::AddLog(CEC_LOG_DEBUG, "error writing command '%s' to serial port '%s': %s", CCECAdapterMessage::ToString(message->Message()), m_port->GetName().c_str(), m_port->GetError().c_str());
     message->state = ADAPTER_MESSAGE_STATE_ERROR;
+    Close();
     return false;
   }
 
@@ -332,15 +336,19 @@ bool CUSBCECAdapterCommunication::ReadFromDevice(uint32_t iTimeout, size_t iSize
     CLockObject lock(m_mutex);
     if (!m_port || !m_port->IsOpen())
       return false;
+
     iBytesRead = m_port->Read(buff, sizeof(uint8_t) * iSize, iTimeout);
+
+    if (m_port->GetErrorNumber())
+    {
+      CLibCEC::AddLog(CEC_LOG_ERROR, "error reading from serial port: %s", m_port->GetError().c_str());
+      m_port->Close();
+      return false;
+    }
   }
 
   if (iBytesRead < 0 || iBytesRead > 256)
-  {
-    CLibCEC::AddLog(CEC_LOG_ERROR, "error reading from serial port: %s", m_port->GetError().c_str());
-    Close();
     return false;
-  }
   else if (iBytesRead > 0)
   {
     /* add the data to the current frame */
index 56c81a6e4709438eef082d9aaf066a8380382856..64f9f00de0f6bf0dcd8bce9aa1c2166b9070f575 100644 (file)
@@ -213,14 +213,38 @@ bool CCECAdapterMessageQueueEntry::MessageReceivedResponse(const CCECAdapterMess
 CCECAdapterMessageQueue::~CCECAdapterMessageQueue(void)
 {
   Clear();
+  StopThread(0);
 }
 
 void CCECAdapterMessageQueue::Clear(void)
 {
+  StopThread(5);
   CLockObject lock(m_mutex);
+  m_writeQueue.Clear();
   m_messages.clear();
 }
 
+void *CCECAdapterMessageQueue::Process(void)
+{
+  CCECAdapterMessageQueueEntry *message(NULL);
+  while (!IsStopped())
+  {
+    /* wait for a new message */
+    if (m_writeQueue.Pop(message, 1000))
+    {
+      /* write this message */
+      m_com->WriteToDevice(message->m_message);
+      if (message->m_message->state == ADAPTER_MESSAGE_STATE_ERROR)
+      {
+        message->Signal();
+        Clear();
+        break;
+      }
+    }
+  }
+  return NULL;
+}
+
 void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg)
 {
   bool bHandled(false);
@@ -293,13 +317,8 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg)
     m_messages.insert(make_pair(iEntryId, entry));
   }
 
-  /* TODO write the message async */
-  if (!m_com->WriteToDevice(msg))
-  {
-    /* error! */
-    Clear();
-    return false;
-  }
+  /* add the message to the write queue */
+  m_writeQueue.Push(entry);
 
   bool bReturn(true);
   if (entry)
index 9c5be4969b0abb084b65eaa09197d89fea880012..e476e26660382a9359d12fc822054d99114e9c10 100644 (file)
@@ -32,6 +32,7 @@
  */
 
 #include "USBCECAdapterMessage.h"
+#include "../platform/threads/threads.h"
 #include <map>
 
 namespace CEC
@@ -85,7 +86,6 @@ namespace CEC
      */
     const char *ToString(void) const;
 
-  private:
     /*!
      * @brief Called when a 'command accepted' message was received.
      * @param message The message that was received.
@@ -120,7 +120,7 @@ namespace CEC
     PLATFORM::CMutex           m_mutex;        /**< mutex for changes to this class */
   };
 
-  class CCECAdapterMessageQueue
+  class CCECAdapterMessageQueue : public PLATFORM::CThread
   {
     friend class CUSBCECAdapterCommunication;
 
@@ -131,6 +131,7 @@ namespace CEC
      * @param iQueueSize The outgoing message queue size.
      */
     CCECAdapterMessageQueue(CUSBCECAdapterCommunication *com) :
+      PLATFORM::CThread(),
       m_com(com),
       m_iNextMessage(0) {}
     virtual ~CCECAdapterMessageQueue(void);
@@ -160,10 +161,13 @@ namespace CEC
      */
     bool Write(CCECAdapterMessage *msg);
 
+    virtual void *Process(void);
+
   private:
     CUSBCECAdapterCommunication *                          m_com;                    /**< the communication handler */
     PLATFORM::CMutex                                       m_mutex;                  /**< mutex for changes to this class */
     std::map<uint64_t, CCECAdapterMessageQueueEntry *>     m_messages;               /**< the outgoing message queue */
+    PLATFORM::SyncedBuffer<CCECAdapterMessageQueueEntry *> m_writeQueue;             /**< the queue for messages that are to be written */
     uint64_t                                               m_iNextMessage;           /**< the index of the next message */
     CCECAdapterMessage                                     m_incomingAdapterMessage; /**< the current incoming message that's being assembled */
     cec_command                                            m_currentCECFrame;        /**< the current incoming CEC command that's being assembled */
index 8777661df1c4e6942f937905126b868c2d2cce1f..aa658ee61f231b4d111ba9d1a19a82bcfd3c17a8 100644 (file)
@@ -41,7 +41,8 @@ namespace PLATFORM
     {
     public:
       SyncedBuffer(size_t iMaxSize = 100) :
-          m_maxSize(iMaxSize) {}
+          m_maxSize(iMaxSize),
+          m_bHasMessages(false) {}
 
       virtual ~SyncedBuffer(void)
       {
@@ -53,6 +54,7 @@ namespace PLATFORM
         CLockObject lock(m_mutex);
         while (!m_buffer.empty())
           m_buffer.pop();
+        m_condition.Broadcast();
       }
 
       size_t Size(void)
@@ -74,17 +76,29 @@ namespace PLATFORM
           return false;
 
         m_buffer.push(entry);
+        m_bHasMessages = true;
+        m_condition.Signal();
         return true;
       }
 
-      bool Pop(_BType &entry)
+      bool Pop(_BType &entry, uint32_t iTimeoutMs = 0)
       {
         bool bReturn(false);
         CLockObject lock(m_mutex);
+
+        // wait for a signal if the buffer is empty
+        if (m_buffer.empty() && iTimeoutMs > 0)
+        {
+          if (!m_condition.Wait(m_mutex, m_bHasMessages, iTimeoutMs))
+            return bReturn;
+        }
+
+        // pop the first item
         if (!m_buffer.empty())
         {
           entry = m_buffer.front();
           m_buffer.pop();
+          m_bHasMessages = !m_buffer.empty();
           bReturn = true;
         }
         return bReturn;
@@ -106,5 +120,7 @@ namespace PLATFORM
       size_t             m_maxSize;
       std::queue<_BType> m_buffer;
       CMutex             m_mutex;
+      CCondition<bool>   m_condition;
+      bool               m_bHasMessages;
     };
 };