Merge branch 'development'
[deb_libcec.git] / src / lib / adapter / USBCECAdapterMessageQueue.cpp
index 9c66051f5e368640e27bc128e5185b55839dd77a..a270a78235792a39a81a6fd746088c7155cea175 100644 (file)
@@ -39,6 +39,8 @@ using namespace CEC;
 using namespace PLATFORM;
 using namespace std;
 
+#define MESSAGE_QUEUE_SIGNAL_WAIT_TIME 1000
+
 CCECAdapterMessageQueueEntry::CCECAdapterMessageQueueEntry(CCECAdapterMessage *message) :
     m_message(message),
     m_iPacketsLeft(message->IsTranmission() ? message->Size() / 4 : 1),
@@ -196,7 +198,7 @@ bool CCECAdapterMessageQueueEntry::MessageReceivedResponse(const CCECAdapterMess
 {
   {
     CLockObject lock(m_mutex);
-    CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - received response", ToString());
+    CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - received response - %s", ToString(), message.ToString().c_str());
     m_message->response = message.packet;
     if (m_message->IsTranmission())
       m_message->state = message.Message() == MSGCODE_TRANSMIT_SUCCEEDED ? ADAPTER_MESSAGE_STATE_SENT_ACKED : ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
@@ -213,14 +215,42 @@ bool CCECAdapterMessageQueueEntry::MessageReceivedResponse(const CCECAdapterMess
 CCECAdapterMessageQueue::~CCECAdapterMessageQueue(void)
 {
   Clear();
+  StopThread(0);
 }
 
 void CCECAdapterMessageQueue::Clear(void)
 {
+  StopThread(5);
   CLockObject lock(m_mutex);
+  m_writeQueue.Clear();
   m_messages.clear();
 }
 
+void *CCECAdapterMessageQueue::Process(void)
+{
+  CCECAdapterMessageQueueEntry *message(NULL);
+  while (!IsStopped())
+  {
+    /* wait for a new message */
+    if (m_writeQueue.Pop(message, MESSAGE_QUEUE_SIGNAL_WAIT_TIME) && message)
+    {
+      /* write this message */
+      {
+        CLockObject lock(m_mutex);
+        m_com->WriteToDevice(message->m_message);
+      }
+      if (message->m_message->state == ADAPTER_MESSAGE_STATE_ERROR ||
+          message->m_message->Message() == MSGCODE_START_BOOTLOADER)
+      {
+        message->Signal();
+        Clear();
+        break;
+      }
+    }
+  }
+  return NULL;
+}
+
 void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg)
 {
   bool bHandled(false);
@@ -279,30 +309,21 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg)
   /* set the correct line timeout */
   if (msg->IsTranmission())
   {
-    if (msg->tries == 1)
-      m_com->SetLineTimeout(msg->lineTimeout);
-    else
-      m_com->SetLineTimeout(msg->retryTimeout);
+    m_com->SetLineTimeout(msg->lineTimeout);
   }
 
-  CCECAdapterMessageQueueEntry *entry(NULL);
+  CCECAdapterMessageQueueEntry *entry = new CCECAdapterMessageQueueEntry(msg);
   uint64_t iEntryId(0);
   /* add to the wait for ack queue */
   if (msg->Message() != MSGCODE_START_BOOTLOADER)
   {
     CLockObject lock(m_mutex);
-    entry = new CCECAdapterMessageQueueEntry(msg);
     iEntryId = m_iNextMessage++;
     m_messages.insert(make_pair(iEntryId, entry));
   }
 
-  /* TODO write the message async */
-  if (!m_com->WriteToDevice(msg))
-  {
-    /* error! */
-    Clear();
-    return false;
-  }
+  /* add the message to the write queue */
+  m_writeQueue.Push(entry);
 
   bool bReturn(true);
   if (entry)
@@ -314,8 +335,12 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg)
       bReturn = false;
     }
 
-    CLockObject lock(m_mutex);
-    m_messages.erase(iEntryId);
+    if (msg->Message() != MSGCODE_START_BOOTLOADER)
+    {
+      CLockObject lock(m_mutex);
+      m_messages.erase(iEntryId);
+    }
+    delete entry;
   }
 
   return bReturn;