cec: extracted a thread class. fixed bug: pthread_cond_wait was called without the...
authorLars Op den Kamp <lars@opdenkamp.eu>
Mon, 3 Oct 2011 19:40:00 +0000 (21:40 +0200)
committerLars Op den Kamp <lars@opdenkamp.eu>
Mon, 3 Oct 2011 19:40:00 +0000 (21:40 +0200)
src/lib/CECParser.cpp
src/lib/CECParser.h
src/lib/Communication.cpp
src/lib/util/threads.cpp
src/lib/util/threads.h

index 7a6d871700a7d701ec0eccf572a9c62d3e883afa..5f242869be3c3f5213f66bdadaec5f821871de6f 100644 (file)
@@ -58,8 +58,7 @@ CCECParser::CCECParser(const char *strDeviceName, cec_logical_address iLogicalAd
     m_iCurrentButton(CEC_USER_CONTROL_CODE_UNKNOWN),
     m_physicaladdress(iPhysicalAddress),
     m_iLogicalAddress(iLogicalAddress),
-    m_strDeviceName(strDeviceName),
-    m_bRunning(false)
+    m_strDeviceName(strDeviceName)
 {
   m_communication = new CCommunication(this);
 }
@@ -94,43 +93,36 @@ bool CCECParser::Open(const char *strPort, int iTimeoutMs /* = 10000 */)
     return false;
   }
 
-  if (pthread_create(&m_thread, NULL, (void *(*) (void *))&CCECParser::ThreadHandler, (void *)this) == 0)
-  {
-    m_bRunning = true;
-    AddLog(CEC_LOG_DEBUG, "processor thread created");
-    pthread_detach(m_thread);
+  if (CreateThread())
     return true;
-  }
   else
-  {
     AddLog(CEC_LOG_ERROR, "could not create a processor thread");
-    m_bRunning = false;
-  }
 
   return false;
 }
 
 void CCECParser::Close(void)
 {
-  m_bRunning = false;
-  pthread_join(m_thread, NULL);
+  StopThread();
 }
 
-void *CCECParser::ThreadHandler(CCECParser *parser)
+void *CCECParser::Process(void)
 {
-  if (parser)
-    parser->Process();
-  return 0;
-}
+  AddLog(CEC_LOG_DEBUG, "processor thread started");
 
-bool CCECParser::Process(void)
-{
   int64_t now = GetTimeMs();
-  while (m_bRunning)
+  while (!m_bStop)
   {
-    cec_frame msg;
-    while (m_bRunning && m_communication->IsOpen() && m_communication->Read(msg, CEC_BUTTON_TIMEOUT))
-      ParseMessage(msg);
+    bool bParseFrame(false);
+    {
+      CLockObject lock(&m_mutex);
+      cec_frame msg;
+      if (!m_bStop && m_communication->IsOpen() && m_communication->Read(msg, CEC_BUTTON_TIMEOUT))
+        bParseFrame = ParseMessage(msg);
+    }
+
+    if (bParseFrame)
+      ParseCurrentFrame();
 
     now = GetTimeMs();
     CheckKeypressTimeout(now);
@@ -138,14 +130,12 @@ bool CCECParser::Process(void)
   }
 
   AddLog(CEC_LOG_DEBUG, "processor thread terminated");
-  m_bRunning = false;
-  m_exitCondition.Signal();
-  return true;
+  return NULL;
 }
 
 bool CCECParser::Ping(void)
 {
-  if (!m_bRunning)
+  if (!IsRunning())
     return false;
 
   AddLog(CEC_LOG_DEBUG, "sending ping");
@@ -168,7 +158,7 @@ bool CCECParser::Ping(void)
 
 bool CCECParser::StartBootloader(void)
 {
-  if (!m_bRunning)
+  if (!IsRunning())
     return false;
 
   AddLog(CEC_LOG_DEBUG, "starting the bootloader");
@@ -199,7 +189,7 @@ bool CCECParser::PowerOffDevices(cec_logical_address address /* = CECDEVICE_BROA
 
 bool CCECParser::PowerOnDevices(cec_logical_address address /* = CECDEVICE_TV */)
 {
-  if (!m_bRunning)
+  if (!IsRunning())
     return false;
 
   CStdString strLog;
@@ -213,7 +203,7 @@ bool CCECParser::PowerOnDevices(cec_logical_address address /* = CECDEVICE_TV */
 
 bool CCECParser::StandbyDevices(cec_logical_address address /* = CECDEVICE_BROADCAST */)
 {
-  if (!m_bRunning)
+  if (!IsRunning())
     return false;
 
   CStdString strLog;
@@ -227,7 +217,7 @@ bool CCECParser::StandbyDevices(cec_logical_address address /* = CECDEVICE_BROAD
 
 bool CCECParser::SetActiveView(void)
 {
-  if (!m_bRunning)
+  if (!IsRunning())
     return false;
 
   AddLog(CEC_LOG_DEBUG, "setting active view");
@@ -241,7 +231,7 @@ bool CCECParser::SetActiveView(void)
 
 bool CCECParser::SetInactiveView(void)
 {
-  if (!m_bRunning)
+  if (!IsRunning())
     return false;
 
   AddLog(CEC_LOG_DEBUG, "setting inactive view");
@@ -260,12 +250,12 @@ bool CCECParser::GetNextLogMessage(cec_log_message *message)
 
 bool CCECParser::GetNextKeypress(cec_keypress *key)
 {
-  return m_bRunning ? m_keyBuffer.Pop(*key) : false;
+  return IsRunning() ? m_keyBuffer.Pop(*key) : false;
 }
 
 bool CCECParser::GetNextCommand(cec_command *command)
 {
-  return m_bRunning ? m_commandBuffer.Pop(*command) : false;
+  return IsRunning() ? m_commandBuffer.Pop(*command) : false;
 }
 //@}
 
@@ -367,8 +357,11 @@ void CCECParser::BroadcastActiveSource(void)
 
 bool CCECParser::TransmitFormatted(const cec_frame &data, bool bWaitForAck /* = true */)
 {
+  CLockObject lock(&m_mutex);
   if (!m_communication || !m_communication->Write(data))
+  {
     return false;
+  }
 
   CCondition::Sleep((int) data.size() * 24 /*data*/ + 5 /*start bit (4.5 ms)*/ + 50 /* to be on the safe side */);
   if (bWaitForAck && !WaitForAck())
@@ -485,10 +478,12 @@ bool CCECParser::WaitForAck(int iTimeout /* = 1000 */)
   return bGotAck && !bError;
 }
 
-void CCECParser::ParseMessage(cec_frame &msg)
+bool CCECParser::ParseMessage(cec_frame &msg)
 {
+  bool bReturn(false);
+
   if (msg.empty())
-    return;
+    return bReturn;
 
   CStdString logStr;
   uint8_t iCode = msg[0] & ~(MSGCODE_FRAME_EOM | MSGCODE_FRAME_ACK);
@@ -545,11 +540,13 @@ void CCECParser::ParseMessage(cec_frame &msg)
       AddLog(CEC_LOG_DEBUG, logStr.c_str());
     }
     if (bEom)
-      ParseCurrentFrame();
+      bReturn = true;
     break;
   default:
     break;
   }
+
+  return bReturn;
 }
 
 void CCECParser::ParseCurrentFrame(void)
index 7b369df474588493b4d94f6b4a2061cb0d1977dc..c70cc481f0a4aa79c464943d5f92f6f0de417c0b 100644 (file)
@@ -35,6 +35,7 @@
 #include <stdio.h>
 #include "../../include/CECExports.h"
 #include "../../include/CECTypes.h"
+#include "util/threads.h"
 #include "util/buffer.h"
 
 class CSerialPort;
@@ -43,7 +44,7 @@ namespace CEC
 {
   class CCommunication;
 
-  class CCECParser : public ICECDevice
+  class CCECParser : public ICECDevice, public CThread
   {
     public:
     /*!
@@ -73,8 +74,7 @@ namespace CEC
       virtual int  GetLibVersion(void);
     //@}
 
-      static void *ThreadHandler(CCECParser *parser);
-      bool Process(void);
+      void *Process(void);
       void AddLog(cec_log_level level, const std::string &strMessage);
     protected:
       virtual bool TransmitFormatted(const cec_frame &data, bool bWaitForAck = true);
@@ -95,7 +95,7 @@ namespace CEC
       bool ReadFromDevice(int iTimeout);
       void ProcessMessages(void);
       bool GetMessage(cec_frame &msg, bool bFromBuffer = true);
-      void ParseMessage(cec_frame &msg);
+      bool ParseMessage(cec_frame &msg);
       void ParseCurrentFrame(void);
 
       void AddData(uint8_t* data, int len);
@@ -113,10 +113,7 @@ namespace CEC
       CecBuffer<cec_keypress>    m_keyBuffer;
       CecBuffer<cec_command>     m_commandBuffer;
       std::string                m_strDeviceName;
-      pthread_t                  m_thread;
       CMutex                     m_mutex;
-      CCondition                 m_exitCondition;
-      bool                       m_bRunning;
       CCommunication            *m_communication;
   };
 };
index f0000466013977f14e73fad2bb8d3879e446def7..fd86b539aae20bb6d91166ec16d6b84328fea6a8 100644 (file)
@@ -182,7 +182,7 @@ bool CCommunication::Read(cec_frame &msg, int iTimeout)
 {
   CLockObject lock(&m_bufferMutex);
 
-  while (m_iInbufUsed < 1)
+  if (m_iInbufUsed < 1)
     m_condition.Wait(&m_bufferMutex, iTimeout);
 
   if (m_iInbufUsed < 1)
index 0e188c3fb71ff206e689bda4846fb273d8fef219..5e08b843624ed925c34b1129d013966bfa87b22f 100644 (file)
 CMutex::CMutex(void)
 {
   pthread_mutex_init(&m_mutex, NULL);
-  m_condition = new CCondition();
-  m_bLocked = false;
 }
 
 CMutex::~CMutex(void)
 {
-  delete m_condition;
   pthread_mutex_destroy(&m_mutex);
 }
 
-bool CMutex::TryLock(int64_t iTimeout)
+bool CMutex::TryLock(void)
 {
-  m_bLocked = (pthread_mutex_trylock(&m_mutex) == 0);
-  if (!m_bLocked)
-  {
-    if (m_condition->Wait(this, iTimeout))
-      m_bLocked = (pthread_mutex_trylock(&m_mutex) == 0);
-  }
-
-  return m_bLocked;
+  return (pthread_mutex_trylock(&m_mutex) == 0);
 }
 
 bool CMutex::Lock(void)
 {
-  m_bLocked = (pthread_mutex_lock(&m_mutex) == 0);
-  return m_bLocked;
+  return (pthread_mutex_lock(&m_mutex) == 0);
 }
 
 void CMutex::Unlock(void)
 {
   pthread_mutex_unlock(&m_mutex);
-  m_bLocked = false;
-  m_condition->Signal();
 }
 
-CLockObject::CLockObject(CMutex *mutex, int64_t iTimeout /* = -1 */) :
-  m_mutex(mutex),
-  m_bLocked(false)
+CLockObject::CLockObject(CMutex *mutex) :
+  m_mutex(mutex)
 {
-  if (iTimeout > 0)
-    m_bLocked = m_mutex->TryLock(iTimeout);
-  else
-    m_bLocked = m_mutex->Lock();
+  m_mutex->Lock();
 }
 
 CLockObject::~CLockObject(void)
@@ -90,19 +73,16 @@ CLockObject::~CLockObject(void)
 void CLockObject::Leave(void)
 {
   m_mutex->Unlock();
-  m_bLocked = false;
 }
 
 void CLockObject::Lock(void)
 {
   m_mutex->Lock();
-  m_bLocked = true;
 }
 
 CCondition::CCondition(void)
 {
   pthread_cond_init(&m_cond, NULL);
-  m_bSignaled = false;
 }
 
 CCondition::~CCondition(void)
@@ -118,6 +98,7 @@ void CCondition::Signal(void)
 
 bool CCondition::Wait(CMutex *mutex, int64_t iTimeout)
 {
+  bool bReturn(false);
   struct timespec abstime;
   struct timeval now;
   if (gettimeofday(&now, NULL) == 0)
@@ -125,14 +106,9 @@ bool CCondition::Wait(CMutex *mutex, int64_t iTimeout)
     iTimeout       += now.tv_usec / 1000;
     abstime.tv_sec  = now.tv_sec + (time_t)(iTimeout / 1000);
     abstime.tv_nsec = (long)((iTimeout % (unsigned long)1000) * (unsigned long)1000000);
-    m_bSignaled     = (pthread_cond_timedwait(&m_cond, &mutex->m_mutex, &abstime) == 0);
-    if (!m_bSignaled)
-      pthread_mutex_unlock(&mutex->m_mutex);
+    bReturn         = (pthread_cond_timedwait(&m_cond, &mutex->m_mutex, &abstime) == 0);
   }
 
-  bool bReturn = m_bSignaled;
-  m_bSignaled = false;
-
   return bReturn;
 }
 
@@ -141,5 +117,46 @@ void CCondition::Sleep(int iTimeout)
   sched_yield();
   CCondition w;
   CMutex m;
+  CLockObject lock(&m);
   w.Wait(&m, iTimeout);
 }
+
+CThread::CThread(void) :
+    m_bRunning(false),
+    m_bStop(false)
+{
+}
+
+CThread::~CThread(void)
+{
+  m_bStop = true;
+  pthread_join(m_thread, NULL);
+}
+
+bool CThread::CreateThread(void)
+{
+  bool bReturn(false);
+
+  if (!m_bRunning && pthread_create(&m_thread, NULL, (void *(*) (void *))&CThread::ThreadHandler, (void *)this) == 0)
+  {
+    m_bRunning = true;
+    pthread_detach(m_thread);
+    bReturn = true;
+  }
+
+  return bReturn;
+}
+
+void *CThread::ThreadHandler(CThread *thread)
+{
+  if (thread)
+    return thread->Process();
+  return NULL;
+}
+
+bool CThread::StopThread(bool bWaitForExit /* = true */)
+{
+  m_bStop = true;
+  if (bWaitForExit)
+    pthread_join(m_thread, NULL);
+}
index 6f6288c4530aa4c99f18346306965baf17d0f5f7..5c05f4180d4e936289ae9edcd67bda9b1c050981 100644 (file)
@@ -39,14 +39,13 @@ class CCondition
 {
 public:
   CCondition(void);
-  ~CCondition(void);
+  virtual ~CCondition(void);
 
   void Signal(void);
   bool Wait(CMutex *mutex, int64_t iTimeout);
   static void Sleep(int iTimeout);
 
 private:
-  bool            m_bSignaled;
   pthread_cond_t  m_cond;
 };
 
@@ -56,20 +55,17 @@ public:
   CMutex(void);
   virtual ~CMutex(void);
 
-  bool TryLock(int64_t iTimeout);
+  bool TryLock(void);
   bool Lock(void);
   void Unlock(void);
-  bool IsLocked(void) const { return m_bLocked; }
 
   pthread_mutex_t m_mutex;
-  CCondition     *m_condition;
-  bool            m_bLocked;
 };
 
 class CLockObject
 {
 public:
-  CLockObject(CMutex *mutex, int64_t iTimeout = -1);
+  CLockObject(CMutex *mutex);
   ~CLockObject(void);
 
   bool IsLocked(void) const { return m_bLocked; }
@@ -80,3 +76,22 @@ private:
   CMutex *m_mutex;
   bool    m_bLocked;
 };
+
+class CThread
+{
+public:
+  CThread(void);
+  virtual ~CThread(void);
+
+  virtual bool IsRunning(void) const { return m_bRunning; }
+  virtual bool CreateThread(void);
+  virtual bool StopThread(bool bWaitForExit = true);
+
+  static void *ThreadHandler(CThread *thread);
+  virtual void *Process(void) = 0;
+
+protected:
+  pthread_t m_thread;
+  bool      m_bRunning;
+  bool      m_bStop;
+};