cec: and now proper predicates
authorLars Op den Kamp <lars@opdenkamp.eu>
Sat, 11 Feb 2012 18:50:50 +0000 (19:50 +0100)
committerLars Op den Kamp <lars@opdenkamp.eu>
Sat, 11 Feb 2012 18:54:51 +0000 (19:54 +0100)
12 files changed:
src/cec-config/cec-config.cpp
src/lib/adapter/USBCECAdapterCommunication.cpp
src/lib/adapter/USBCECAdapterCommunication.h
src/lib/adapter/USBCECAdapterMessage.h
src/lib/implementations/CECCommandHandler.cpp
src/lib/implementations/CECCommandHandler.h
src/lib/platform/posix/os-threads.h
src/lib/platform/sockets/socket.h
src/lib/platform/threads/mutex.h
src/lib/platform/threads/threads.h
src/lib/platform/util/timeutils.h
src/testclient/main.cpp

index a853f9169f2adda9c0ea8073052e93a694d446f4..3fc7af3f656ced928b4d220c8b15950e8c86c031 100644 (file)
@@ -50,12 +50,10 @@ using namespace PLATFORM;
 
 CMutex                g_outputMutex;
 
-CMutex                g_responseMutex;
-CCondition            g_responseCondtion;
+CEvent                g_responseEvent;
 cec_opcode            g_lastCommand = CEC_OPCODE_NONE;
 
-CMutex                g_keyMutex;
-CCondition            g_keyCondtion;
+CEvent                g_keyEvent;
 cec_user_control_code g_lastKey = CEC_USER_CONTROL_CODE_UNKNOWN;
 
 ICECCallbacks         g_callbacks;
@@ -129,17 +127,15 @@ int CecLogMessage(void *UNUSED(cbParam), const cec_log_message &message)
 
 int CecKeyPress(void *UNUSED(cbParam), const cec_keypress &key)
 {
-  CLockObject lock(g_keyMutex);
   g_lastKey = key.keycode;
-  g_keyCondtion.Signal();
+  g_keyEvent.Signal();
   return 0;
 }
 
 int CecCommand(void *UNUSED(cbParam), const cec_command &command)
 {
-  CLockObject lock(g_responseMutex);
   g_lastCommand = command.opcode;
-  g_responseCondtion.Signal();
+  g_responseEvent.Signal();
   return 0;
 }
 
@@ -326,8 +322,7 @@ bool PowerOnTV(uint64_t iTimeout = 60000)
       g_parser->PowerOnDevices(CECDEVICE_TV);
       while (iTarget > iNow)
       {
-        CLockObject lock(g_responseMutex);
-        g_responseCondtion.Wait(g_responseMutex, (uint32_t)(iTarget - iNow));
+        g_responseEvent.Wait((uint32_t)(iTarget - iNow));
         if (g_lastCommand == CEC_OPCODE_REQUEST_ACTIVE_SOURCE)
           break;
         iNow = GetTimeMs();
index 32158de76513e32c7cce41c9b58a6488566318d5..8e0b44e58b57e72fbf440d123f1da58b1094fcb9 100644 (file)
@@ -43,6 +43,7 @@ using namespace PLATFORM;
 CUSBCECAdapterCommunication::CUSBCECAdapterCommunication(CCECProcessor *processor, const char *strPort, uint16_t iBaudRate /* = 38400 */) :
     m_port(NULL),
     m_processor(processor),
+    m_bHasData(false),
     m_iLineTimeout(0),
     m_iFirmwareVersion(CEC_FW_VERSION_UNKNOWN),
     m_lastInitiator(CECDEVICE_UNKNOWN),
@@ -174,6 +175,7 @@ void CUSBCECAdapterCommunication::Close(void)
 {
   SetAckMask(0);
   CLockObject lock(m_mutex);
+  m_bHasData = true;
   m_rcvCondition.Broadcast();
   StopThread();
 }
@@ -203,7 +205,7 @@ void *CUSBCECAdapterCommunication::Process(void)
 
   CCECAdapterMessage *msg(NULL);
   if (m_outBuffer.Pop(msg))
-    msg->condition.Broadcast();
+    msg->event.Broadcast();
 
   if (m_port)
   {
@@ -247,10 +249,9 @@ cec_adapter_message_state CUSBCECAdapterCommunication::Write(const cec_command &
 
 bool CUSBCECAdapterCommunication::Write(CCECAdapterMessage *data)
 {
-  CLockObject lock(data->mutex);
   data->state = ADAPTER_MESSAGE_STATE_WAITING_TO_BE_SENT;
   m_outBuffer.Push(data);
-  data->condition.Wait(data->mutex);
+  data->event.Wait();
 
   if ((data->expectControllerAck && data->state != ADAPTER_MESSAGE_STATE_SENT_ACKED) ||
       (!data->expectControllerAck && data->state != ADAPTER_MESSAGE_STATE_SENT))
@@ -289,9 +290,10 @@ bool CUSBCECAdapterCommunication::Read(CCECAdapterMessage &msg, uint32_t iTimeou
 
   if (!m_inBuffer.Pop(buf))
   {
-    if (iTimeout == 0 || !m_rcvCondition.Wait(m_mutex, iTimeout))
+    if (iTimeout == 0 || !m_rcvCondition.Wait(m_mutex, m_bHasData, iTimeout))
       return false;
     m_inBuffer.Pop(buf);
+    m_bHasData = m_inBuffer.Size() > 0;
   }
 
   if (buf)
@@ -626,6 +628,7 @@ void CUSBCECAdapterCommunication::AddData(uint8_t *data, size_t iLen)
       m_currentAdapterMessage.Clear();
       m_bGotStart = false;
       m_bNextIsEscaped = false;
+      m_bHasData = true;
       m_rcvCondition.Signal();
     }
     else if (m_bNextIsEscaped)
@@ -671,7 +674,6 @@ bool CUSBCECAdapterCommunication::ReadFromDevice(uint32_t iTimeout, size_t iSize
 void CUSBCECAdapterCommunication::SendMessageToAdapter(CCECAdapterMessage *msg)
 {
   CLockObject adapterLock(m_mutex);
-  CLockObject lock(msg->mutex);
   if (msg->tries == 1)
     SetLineTimeout(msg->lineTimeout);
   else
@@ -693,7 +695,7 @@ void CUSBCECAdapterCommunication::SendMessageToAdapter(CCECAdapterMessage *msg)
         CLibCEC::AddLog(CEC_LOG_DEBUG, "did not receive ack");
     }
   }
-  msg->condition.Signal();
+  msg->event.Signal();
 }
 
 void CUSBCECAdapterCommunication::WriteNextCommand(void)
index 659e65922d1a519fb946ffda8a7817d071f68e71..d2de30e17447c7f41e15b895dd5ee5aa73af4613 100644 (file)
@@ -85,7 +85,8 @@ namespace CEC
     PLATFORM::SyncedBuffer<CCECAdapterMessage *> m_inBuffer;
     PLATFORM::SyncedBuffer<CCECAdapterMessage *> m_outBuffer;
     PLATFORM::CMutex                             m_mutex;
-    PLATFORM::CCondition                         m_rcvCondition;
+    PLATFORM::CCondition<volatile bool &>        m_rcvCondition;
+    volatile bool                                m_bHasData;
     uint8_t                                      m_iLineTimeout;
     uint16_t                                     m_iFirmwareVersion;
     cec_command                                  m_currentframe;
index ae41177b4f7ef4109cebd403c179009c0c585b15..d3f5d3e63f0108d58595d72a258c3161fce5d9dc 100644 (file)
@@ -346,17 +346,16 @@ namespace CEC
           CECDEVICE_UNKNOWN;
     }
 
-    uint8_t                   maxTries;
-    uint8_t                   tries;
-    cec_adapter_messagecode   reply;
-    cec_datapacket            packet;
-    cec_adapter_message_state state;
-    int32_t                   transmit_timeout;
-    bool                      isTransmission;
-    bool                      expectControllerAck;
-    uint8_t                   lineTimeout;
-    uint8_t                   retryTimeout;
-    PLATFORM::CMutex          mutex;
-    PLATFORM::CCondition      condition;
+    uint8_t                               maxTries;
+    uint8_t                               tries;
+    cec_adapter_messagecode               reply;
+    cec_datapacket                        packet;
+    cec_adapter_message_state             state;
+    int32_t                               transmit_timeout;
+    bool                                  isTransmission;
+    bool                                  expectControllerAck;
+    uint8_t                               lineTimeout;
+    uint8_t                               retryTimeout;
+    PLATFORM::CEvent                      event;
   };
 }
index 0f006272e2d2d45d1a426350013f114ecf60489c..44cf477b3507fa0d9073a577e7fbee887514fff1 100644 (file)
@@ -51,7 +51,8 @@ CCECCommandHandler::CCECCommandHandler(CCECBusDevice *busDevice) :
     m_iUseCounter(0),
     m_expectedResponse(CEC_OPCODE_NONE),
     m_bOPTSendDeckStatusUpdateOnActiveSource(false),
-    m_vendorId(CEC_VENDOR_UNKNOWN)
+    m_vendorId(CEC_VENDOR_UNKNOWN),
+    m_bRcvSignal(false)
 {
 }
 
@@ -195,7 +196,10 @@ bool CCECCommandHandler::HandleCommand(const cec_command &command)
     CLockObject lock(m_receiveMutex);
     if (m_expectedResponse == CEC_OPCODE_NONE ||
         m_expectedResponse == command.opcode)
+    {
+      m_bRcvSignal = true;
       m_condition.Signal();
+    }
   }
 
   MarkReady();
@@ -962,7 +966,9 @@ bool CCECCommandHandler::Transmit(cec_command &command, bool bExpectResponse /*
       {
         CLibCEC::AddLog(CEC_LOG_DEBUG, "command transmitted");
         if (bExpectResponse)
-          bReturn = m_condition.Wait(m_receiveMutex, m_iTransmitWait);
+          bReturn = m_condition.Wait(m_receiveMutex, m_bRcvSignal, m_iTransmitWait);
+        if (bReturn)
+          m_bRcvSignal = false;
       }
     }
     --m_iUseCounter;
index be553a9cc61136bb0a72f147e758f860164ec992..d3cbe3ee1f30491ce9aff975d76fed98acabc9ab 100644 (file)
@@ -134,17 +134,18 @@ namespace CEC
 
     virtual bool Transmit(cec_command &command, bool bExpectResponse = true, cec_opcode expectedResponse = CEC_OPCODE_NONE);
 
-    CCECBusDevice *      m_busDevice;
-    CCECProcessor *      m_processor;
-    int32_t              m_iTransmitTimeout;
-    int32_t              m_iTransmitWait;
-    int8_t               m_iTransmitRetries;
-    bool                 m_bHandlerInited;
-    uint8_t              m_iUseCounter;
-    cec_opcode           m_expectedResponse;
-    bool                 m_bOPTSendDeckStatusUpdateOnActiveSource;
-    cec_vendor_id        m_vendorId;
-    PLATFORM::CMutex     m_receiveMutex;
-    PLATFORM::CCondition m_condition;
+    CCECBusDevice *                       m_busDevice;
+    CCECProcessor *                       m_processor;
+    int32_t                               m_iTransmitTimeout;
+    int32_t                               m_iTransmitWait;
+    int8_t                                m_iTransmitRetries;
+    bool                                  m_bHandlerInited;
+    uint8_t                               m_iUseCounter;
+    cec_opcode                            m_expectedResponse;
+    bool                                  m_bOPTSendDeckStatusUpdateOnActiveSource;
+    cec_vendor_id                         m_vendorId;
+    PLATFORM::CMutex                      m_receiveMutex;
+    PLATFORM::CCondition<volatile bool &> m_condition;
+    volatile bool                         m_bRcvSignal;
   };
 };
index 48e1e355ca16e8ee18965852cda41bfd48329c63..5216ffe7793ac40bea9c28bf93ffbb6e7cba986b 100644 (file)
@@ -92,17 +92,22 @@ namespace PLATFORM
       pthread_cond_broadcast(&m_condition);
     }
 
-    bool Wait(mutex_t &mutex, uint32_t iTimeoutMs)
+    bool Wait(mutex_t &mutex)
     {
       sched_yield();
-      if (iTimeoutMs > 0)
-      {
-        struct timespec timeout = GetAbsTime(iTimeoutMs);
-        return (pthread_cond_timedwait(&m_condition, &mutex, &timeout) == 0);
-      }
       return (pthread_cond_wait(&m_condition, &mutex) == 0);
     }
 
+    bool Wait(mutex_t &mutex, uint32_t iTimeoutMs)
+    {
+      if (iTimeoutMs == 0)
+        return Wait(mutex);
+
+      sched_yield();
+      struct timespec timeout = GetAbsTime(iTimeoutMs);
+      return (pthread_cond_timedwait(&m_condition, &mutex, &timeout) == 0);
+    }
+
     pthread_cond_t m_condition;
   };
 }
index 58a3e3a751efbcad16462fc173a73917703e074f..369fbe4c9f78101ba6cef915e1bff37e4737f8e2 100644 (file)
@@ -105,7 +105,7 @@ namespace PLATFORM
   public:
     CProtectedSocket(_Socket *socket) :
       m_socket(socket),
-      m_iUseCount(0) {}
+      m_bIsIdle(true) {}
 
     virtual ~CProtectedSocket(void)
     {
@@ -151,13 +151,13 @@ namespace PLATFORM
     virtual bool IsBusy(void)
     {
       CLockObject lock(m_mutex);
-      return m_socket && m_iUseCount > 0;
+      return m_socket && !m_bIsIdle;
     }
 
-    virtual int GetUseCount(void)
+    virtual bool IsIdle(void)
     {
       CLockObject lock(m_mutex);
-      return m_iUseCount;
+      return m_socket && m_bIsIdle;
     }
 
     virtual ssize_t Write(void* data, size_t len)
@@ -208,27 +208,21 @@ namespace PLATFORM
     bool WaitReady(void)
     {
       CLockObject lock(m_mutex);
-      if (m_iUseCount > 0)
-        m_condition.Wait(m_mutex);
-
-      if (m_iUseCount > 0)
-        return false;
-
-      ++m_iUseCount;
+      m_condition.Wait(m_mutex, m_bIsIdle);
+      m_bIsIdle = false;
       return true;
     }
 
     void MarkReady(void)
     {
       CLockObject lock(m_mutex);
-      if (m_iUseCount > 0)
-        --m_iUseCount;
+      m_bIsIdle = true;
       m_condition.Signal();
     }
 
-    _Socket   *m_socket;
-    CMutex     m_mutex;
-    CCondition m_condition;
-    int        m_iUseCount;
+    _Socket *          m_socket;
+    CMutex             m_mutex;
+    CCondition<bool &> m_condition;
+    bool               m_bIsIdle;
   };
 };
index e8481012773e428561c0e1f7c7652117926aa421..11e3606d09250e5d715eb116ad2a3d2ca939117a 100644 (file)
@@ -39,6 +39,8 @@
 #include "../posix/os-threads.h"
 #endif
 
+#include "../util/timeutils.h"
+
 namespace PLATFORM
 {
   class PreventCopy
@@ -52,11 +54,13 @@ namespace PLATFORM
     inline PreventCopy &operator=(const PreventCopy &c){ *this = c; return *this; }
   };
 
-  class CCondition;
+  template <typename _Predicate>
+    class CCondition;
 
   class CMutex : public PreventCopy
   {
-    friend class CCondition;
+    template <typename _Predicate>
+      friend class CCondition;
   public:
     inline CMutex(void) :
       m_iLockCount(0)
@@ -221,39 +225,126 @@ namespace PLATFORM
     volatile bool m_bIsLocked;
   };
 
-  class CCondition : public PreventCopy
+  template <typename _Predicate>
+    class CCondition : public PreventCopy
+    {
+    public:
+      inline CCondition(void) {}
+      inline ~CCondition(void)
+      {
+        m_condition.Broadcast();
+      }
+
+      inline void Broadcast(void)
+      {
+        m_condition.Broadcast();
+      }
+
+      inline void Signal(void)
+      {
+        m_condition.Signal();
+      }
+
+      inline bool Wait(CMutex &mutex, _Predicate &predicate)
+      {
+        while(!predicate)
+          m_condition.Wait(mutex.m_mutex);
+        return true;
+      }
+
+      inline bool Wait(CMutex &mutex, _Predicate &predicate, uint32_t iTimeout)
+      {
+        if (iTimeout == 0)
+          return Wait(mutex, predicate);
+
+        bool bReturn(true);
+        if (!predicate)
+        {
+          CTimeout timeout(iTimeout);
+          uint64_t iMsLeft(0);
+          bReturn = false;
+          while (!bReturn)
+          {
+            iMsLeft = timeout.TimeLeft();
+            if ((bReturn = iMsLeft == 0 || predicate) == false)
+              m_condition.Wait(mutex.m_mutex, iMsLeft);
+          }
+        }
+        return bReturn;
+      }
+
+    private:
+      CConditionImpl m_condition;
+    };
+
+  class CEvent
   {
   public:
-    inline CCondition(void) {}
-    inline ~CCondition(void)
+    CEvent(void) :
+      m_bSignaled(false),
+      m_bBroadcast(false),
+      m_iWaitingThreads(0) {}
+    virtual ~CEvent(void) {}
+
+    void Broadcast(void)
     {
+      Set(true);
       m_condition.Broadcast();
     }
 
-    inline void Broadcast(void)
+    void Signal(void)
     {
-      m_condition.Broadcast();
+      Set(false);
+      m_condition.Signal();
     }
 
-    inline void Signal(void)
+    bool Wait(void)
     {
-      m_condition.Signal();
+      CLockObject lock(m_mutex);
+      ++m_iWaitingThreads;
+
+      bool bReturn = m_condition.Wait(m_mutex, m_bSignaled);
+      return ResetAndReturn() && bReturn;
     }
 
-    inline bool Wait(CMutex &mutex, uint32_t iTimeout = 0)
+    bool Wait(uint32_t iTimeout)
     {
-      return m_condition.Wait(mutex.m_mutex, iTimeout);
+      if (iTimeout == 0)
+        return Wait();
+
+      CLockObject lock(m_mutex);
+      ++m_iWaitingThreads;
+      bool bReturn = m_condition.Wait(m_mutex, m_bSignaled, iTimeout);
+      return ResetAndReturn() && bReturn;
     }
 
     static void Sleep(uint32_t iTimeout)
     {
-      CCondition w;
-      CMutex m;
-      CLockObject lock(m);
-      w.Wait(m, iTimeout);
+      CEvent event;
+      event.Wait(iTimeout);
     }
 
   private:
-    CConditionImpl m_condition;
+    void Set(bool bBroadcast = false)
+    {
+      CLockObject lock(m_mutex);
+      m_bSignaled  = true;
+      m_bBroadcast = bBroadcast;
+    }
+
+    bool ResetAndReturn(void)
+    {
+      CLockObject lock(m_mutex);
+      bool bReturn(m_bSignaled);
+      if (bReturn && (--m_iWaitingThreads == 0 || !m_bBroadcast))
+        m_bSignaled = false;
+      return bReturn;
+    }
+
+    volatile bool              m_bSignaled;
+    CCondition<volatile bool&> m_condition;
+    CMutex                     m_mutex;
+    volatile bool              m_bBroadcast;
+    unsigned int               m_iWaitingThreads;
   };
 }
index 8fb71fce2b2158382168379054d30750b1e0c4dc..f2de310a23749f7960ef3882008c61e317c5eeb9 100644 (file)
@@ -40,7 +40,8 @@ namespace PLATFORM
   public:
     CThread(void) :
         m_bStop(false),
-        m_bRunning(false) {}
+        m_bRunning(false),
+        m_bStopped(false) {}
 
     virtual ~CThread(void)
     {
@@ -53,17 +54,21 @@ namespace PLATFORM
 
       if (thread)
       {
-        CLockObject lock(thread->m_threadMutex);
-        thread->m_bRunning = true;
-        lock.Unlock();
-        thread->m_threadCondition.Broadcast();
+        {
+          CLockObject lock(thread->m_threadMutex);
+          thread->m_bRunning = true;
+          thread->m_bStopped = false;
+          thread->m_threadCondition.Broadcast();
+        }
 
         retVal = thread->Process();
 
-        lock.Lock();
-        thread->m_bRunning = false;
-        lock.Unlock();
-        thread->m_threadCondition.Broadcast();
+        {
+          CLockObject lock(thread->m_threadMutex);
+          thread->m_bRunning = false;
+          thread->m_bStopped = true;
+          thread->m_threadCondition.Broadcast();
+        }
       }
 
       return retVal;
@@ -91,7 +96,7 @@ namespace PLATFORM
           if (ThreadsCreate(m_thread, CThread::ThreadHandler, ((void*)static_cast<CThread *>(this))))
           {
             if (bWait)
-              m_threadCondition.Wait(m_threadMutex);
+              m_threadCondition.Wait(m_threadMutex, m_bRunning);
             bReturn = true;
           }
         }
@@ -106,7 +111,6 @@ namespace PLATFORM
         CLockObject lock(m_threadMutex);
         bRunning = IsRunning();
         m_bStop = true;
-        m_threadCondition.Broadcast();
       }
 
       if (bRunning && bWaitForExit)
@@ -120,7 +124,7 @@ namespace PLATFORM
     virtual bool Sleep(uint32_t iTimeout)
     {
       CLockObject lock(m_threadMutex);
-      return m_bStop ? false : m_threadCondition.Wait(m_threadMutex, iTimeout);
+      return m_bStop ? false : m_threadCondition.Wait(m_threadMutex, m_bStopped, iTimeout);
     }
 
     virtual void *Process(void) = 0;
@@ -129,10 +133,11 @@ namespace PLATFORM
     void SetRunning(bool bSetTo);
 
   private:
-    bool       m_bStop;
-    bool       m_bRunning;
-    CCondition m_threadCondition;
-    CMutex     m_threadMutex;
-    thread_t   m_thread;
+    bool               m_bStop;
+    bool               m_bRunning;
+    bool               m_bStopped;
+    CCondition<bool &> m_threadCondition;
+    CMutex             m_threadMutex;
+    thread_t           m_thread;
   };
 };
index d2992e75d0d46bebefe4e50741cae358ecbe1865..367c80bbbc3c1db707046e2efec38f4d1a3f9b6d 100644 (file)
@@ -103,4 +103,20 @@ namespace PLATFORM
   {
     return (T)GetTimeMs() / (T)1000.0;
   }
+
+  class CTimeout
+  {
+  public:
+    CTimeout(uint32_t iTime) :
+      m_iTarget(GetTimeMs() + iTime) {}
+
+    uint64_t TimeLeft(void) const
+    {
+      uint64_t iNow = GetTimeMs();
+      return (iNow > m_iTarget) ? 0 : m_iTarget - iNow;
+    }
+
+  private:
+    uint64_t m_iTarget;
+  };
 };
index dde998014b212078277dbe1d95d90bbcbc3608c6..75172498e5823845a35d2c2f8ef3f7f2519ba9aa 100644 (file)
@@ -1127,7 +1127,7 @@ int main (int argc, char *argv[])
       g_bExit = true;
 
     if (!g_bExit && !g_bHardExit)
-      CCondition::Sleep(50);
+      CEvent::Sleep(50);
   }
 
   if (!g_bSingleCommand && !g_bHardExit)