From 60fa45780df645538897eb092a736498105ee8c2 Mon Sep 17 00:00:00 2001 From: Lars Op den Kamp Date: Mon, 3 Oct 2011 21:40:00 +0200 Subject: [PATCH] cec: extracted a thread class. fixed bug: pthread_cond_wait was called without the mutex locked. fixed possible deadlock: don't send messages and wait for an ack with the mutex locked in CCECParser. removed some obsolete code. --- src/lib/CECParser.cpp | 73 +++++++++++++++++------------------ src/lib/CECParser.h | 11 ++---- src/lib/Communication.cpp | 2 +- src/lib/util/threads.cpp | 81 +++++++++++++++++++++++---------------- src/lib/util/threads.h | 29 ++++++++++---- 5 files changed, 111 insertions(+), 85 deletions(-) diff --git a/src/lib/CECParser.cpp b/src/lib/CECParser.cpp index 7a6d871..5f24286 100644 --- a/src/lib/CECParser.cpp +++ b/src/lib/CECParser.cpp @@ -58,8 +58,7 @@ CCECParser::CCECParser(const char *strDeviceName, cec_logical_address iLogicalAd m_iCurrentButton(CEC_USER_CONTROL_CODE_UNKNOWN), m_physicaladdress(iPhysicalAddress), m_iLogicalAddress(iLogicalAddress), - m_strDeviceName(strDeviceName), - m_bRunning(false) + m_strDeviceName(strDeviceName) { m_communication = new CCommunication(this); } @@ -94,43 +93,36 @@ bool CCECParser::Open(const char *strPort, int iTimeoutMs /* = 10000 */) return false; } - if (pthread_create(&m_thread, NULL, (void *(*) (void *))&CCECParser::ThreadHandler, (void *)this) == 0) - { - m_bRunning = true; - AddLog(CEC_LOG_DEBUG, "processor thread created"); - pthread_detach(m_thread); + if (CreateThread()) return true; - } else - { AddLog(CEC_LOG_ERROR, "could not create a processor thread"); - m_bRunning = false; - } return false; } void CCECParser::Close(void) { - m_bRunning = false; - pthread_join(m_thread, NULL); + StopThread(); } -void *CCECParser::ThreadHandler(CCECParser *parser) +void *CCECParser::Process(void) { - if (parser) - parser->Process(); - return 0; -} + AddLog(CEC_LOG_DEBUG, "processor thread started"); -bool CCECParser::Process(void) -{ int64_t now = GetTimeMs(); - while (m_bRunning) + while (!m_bStop) { - cec_frame msg; - while (m_bRunning && m_communication->IsOpen() && m_communication->Read(msg, CEC_BUTTON_TIMEOUT)) - ParseMessage(msg); + bool bParseFrame(false); + { + CLockObject lock(&m_mutex); + cec_frame msg; + if (!m_bStop && m_communication->IsOpen() && m_communication->Read(msg, CEC_BUTTON_TIMEOUT)) + bParseFrame = ParseMessage(msg); + } + + if (bParseFrame) + ParseCurrentFrame(); now = GetTimeMs(); CheckKeypressTimeout(now); @@ -138,14 +130,12 @@ bool CCECParser::Process(void) } AddLog(CEC_LOG_DEBUG, "processor thread terminated"); - m_bRunning = false; - m_exitCondition.Signal(); - return true; + return NULL; } bool CCECParser::Ping(void) { - if (!m_bRunning) + if (!IsRunning()) return false; AddLog(CEC_LOG_DEBUG, "sending ping"); @@ -168,7 +158,7 @@ bool CCECParser::Ping(void) bool CCECParser::StartBootloader(void) { - if (!m_bRunning) + if (!IsRunning()) return false; AddLog(CEC_LOG_DEBUG, "starting the bootloader"); @@ -199,7 +189,7 @@ bool CCECParser::PowerOffDevices(cec_logical_address address /* = CECDEVICE_BROA bool CCECParser::PowerOnDevices(cec_logical_address address /* = CECDEVICE_TV */) { - if (!m_bRunning) + if (!IsRunning()) return false; CStdString strLog; @@ -213,7 +203,7 @@ bool CCECParser::PowerOnDevices(cec_logical_address address /* = CECDEVICE_TV */ bool CCECParser::StandbyDevices(cec_logical_address address /* = CECDEVICE_BROADCAST */) { - if (!m_bRunning) + if (!IsRunning()) return false; CStdString strLog; @@ -227,7 +217,7 @@ bool CCECParser::StandbyDevices(cec_logical_address address /* = CECDEVICE_BROAD bool CCECParser::SetActiveView(void) { - if (!m_bRunning) + if (!IsRunning()) return false; AddLog(CEC_LOG_DEBUG, "setting active view"); @@ -241,7 +231,7 @@ bool CCECParser::SetActiveView(void) bool CCECParser::SetInactiveView(void) { - if (!m_bRunning) + if (!IsRunning()) return false; AddLog(CEC_LOG_DEBUG, "setting inactive view"); @@ -260,12 +250,12 @@ bool CCECParser::GetNextLogMessage(cec_log_message *message) bool CCECParser::GetNextKeypress(cec_keypress *key) { - return m_bRunning ? m_keyBuffer.Pop(*key) : false; + return IsRunning() ? m_keyBuffer.Pop(*key) : false; } bool CCECParser::GetNextCommand(cec_command *command) { - return m_bRunning ? m_commandBuffer.Pop(*command) : false; + return IsRunning() ? m_commandBuffer.Pop(*command) : false; } //@} @@ -367,8 +357,11 @@ void CCECParser::BroadcastActiveSource(void) bool CCECParser::TransmitFormatted(const cec_frame &data, bool bWaitForAck /* = true */) { + CLockObject lock(&m_mutex); if (!m_communication || !m_communication->Write(data)) + { return false; + } CCondition::Sleep((int) data.size() * 24 /*data*/ + 5 /*start bit (4.5 ms)*/ + 50 /* to be on the safe side */); if (bWaitForAck && !WaitForAck()) @@ -485,10 +478,12 @@ bool CCECParser::WaitForAck(int iTimeout /* = 1000 */) return bGotAck && !bError; } -void CCECParser::ParseMessage(cec_frame &msg) +bool CCECParser::ParseMessage(cec_frame &msg) { + bool bReturn(false); + if (msg.empty()) - return; + return bReturn; CStdString logStr; uint8_t iCode = msg[0] & ~(MSGCODE_FRAME_EOM | MSGCODE_FRAME_ACK); @@ -545,11 +540,13 @@ void CCECParser::ParseMessage(cec_frame &msg) AddLog(CEC_LOG_DEBUG, logStr.c_str()); } if (bEom) - ParseCurrentFrame(); + bReturn = true; break; default: break; } + + return bReturn; } void CCECParser::ParseCurrentFrame(void) diff --git a/src/lib/CECParser.h b/src/lib/CECParser.h index 7b369df..c70cc48 100644 --- a/src/lib/CECParser.h +++ b/src/lib/CECParser.h @@ -35,6 +35,7 @@ #include #include "../../include/CECExports.h" #include "../../include/CECTypes.h" +#include "util/threads.h" #include "util/buffer.h" class CSerialPort; @@ -43,7 +44,7 @@ namespace CEC { class CCommunication; - class CCECParser : public ICECDevice + class CCECParser : public ICECDevice, public CThread { public: /*! @@ -73,8 +74,7 @@ namespace CEC virtual int GetLibVersion(void); //@} - static void *ThreadHandler(CCECParser *parser); - bool Process(void); + void *Process(void); void AddLog(cec_log_level level, const std::string &strMessage); protected: virtual bool TransmitFormatted(const cec_frame &data, bool bWaitForAck = true); @@ -95,7 +95,7 @@ namespace CEC bool ReadFromDevice(int iTimeout); void ProcessMessages(void); bool GetMessage(cec_frame &msg, bool bFromBuffer = true); - void ParseMessage(cec_frame &msg); + bool ParseMessage(cec_frame &msg); void ParseCurrentFrame(void); void AddData(uint8_t* data, int len); @@ -113,10 +113,7 @@ namespace CEC CecBuffer m_keyBuffer; CecBuffer m_commandBuffer; std::string m_strDeviceName; - pthread_t m_thread; CMutex m_mutex; - CCondition m_exitCondition; - bool m_bRunning; CCommunication *m_communication; }; }; diff --git a/src/lib/Communication.cpp b/src/lib/Communication.cpp index f000046..fd86b53 100644 --- a/src/lib/Communication.cpp +++ b/src/lib/Communication.cpp @@ -182,7 +182,7 @@ bool CCommunication::Read(cec_frame &msg, int iTimeout) { CLockObject lock(&m_bufferMutex); - while (m_iInbufUsed < 1) + if (m_iInbufUsed < 1) m_condition.Wait(&m_bufferMutex, iTimeout); if (m_iInbufUsed < 1) diff --git a/src/lib/util/threads.cpp b/src/lib/util/threads.cpp index 0e188c3..5e08b84 100644 --- a/src/lib/util/threads.cpp +++ b/src/lib/util/threads.cpp @@ -36,49 +36,32 @@ CMutex::CMutex(void) { pthread_mutex_init(&m_mutex, NULL); - m_condition = new CCondition(); - m_bLocked = false; } CMutex::~CMutex(void) { - delete m_condition; pthread_mutex_destroy(&m_mutex); } -bool CMutex::TryLock(int64_t iTimeout) +bool CMutex::TryLock(void) { - m_bLocked = (pthread_mutex_trylock(&m_mutex) == 0); - if (!m_bLocked) - { - if (m_condition->Wait(this, iTimeout)) - m_bLocked = (pthread_mutex_trylock(&m_mutex) == 0); - } - - return m_bLocked; + return (pthread_mutex_trylock(&m_mutex) == 0); } bool CMutex::Lock(void) { - m_bLocked = (pthread_mutex_lock(&m_mutex) == 0); - return m_bLocked; + return (pthread_mutex_lock(&m_mutex) == 0); } void CMutex::Unlock(void) { pthread_mutex_unlock(&m_mutex); - m_bLocked = false; - m_condition->Signal(); } -CLockObject::CLockObject(CMutex *mutex, int64_t iTimeout /* = -1 */) : - m_mutex(mutex), - m_bLocked(false) +CLockObject::CLockObject(CMutex *mutex) : + m_mutex(mutex) { - if (iTimeout > 0) - m_bLocked = m_mutex->TryLock(iTimeout); - else - m_bLocked = m_mutex->Lock(); + m_mutex->Lock(); } CLockObject::~CLockObject(void) @@ -90,19 +73,16 @@ CLockObject::~CLockObject(void) void CLockObject::Leave(void) { m_mutex->Unlock(); - m_bLocked = false; } void CLockObject::Lock(void) { m_mutex->Lock(); - m_bLocked = true; } CCondition::CCondition(void) { pthread_cond_init(&m_cond, NULL); - m_bSignaled = false; } CCondition::~CCondition(void) @@ -118,6 +98,7 @@ void CCondition::Signal(void) bool CCondition::Wait(CMutex *mutex, int64_t iTimeout) { + bool bReturn(false); struct timespec abstime; struct timeval now; if (gettimeofday(&now, NULL) == 0) @@ -125,14 +106,9 @@ bool CCondition::Wait(CMutex *mutex, int64_t iTimeout) iTimeout += now.tv_usec / 1000; abstime.tv_sec = now.tv_sec + (time_t)(iTimeout / 1000); abstime.tv_nsec = (long)((iTimeout % (unsigned long)1000) * (unsigned long)1000000); - m_bSignaled = (pthread_cond_timedwait(&m_cond, &mutex->m_mutex, &abstime) == 0); - if (!m_bSignaled) - pthread_mutex_unlock(&mutex->m_mutex); + bReturn = (pthread_cond_timedwait(&m_cond, &mutex->m_mutex, &abstime) == 0); } - bool bReturn = m_bSignaled; - m_bSignaled = false; - return bReturn; } @@ -141,5 +117,46 @@ void CCondition::Sleep(int iTimeout) sched_yield(); CCondition w; CMutex m; + CLockObject lock(&m); w.Wait(&m, iTimeout); } + +CThread::CThread(void) : + m_bRunning(false), + m_bStop(false) +{ +} + +CThread::~CThread(void) +{ + m_bStop = true; + pthread_join(m_thread, NULL); +} + +bool CThread::CreateThread(void) +{ + bool bReturn(false); + + if (!m_bRunning && pthread_create(&m_thread, NULL, (void *(*) (void *))&CThread::ThreadHandler, (void *)this) == 0) + { + m_bRunning = true; + pthread_detach(m_thread); + bReturn = true; + } + + return bReturn; +} + +void *CThread::ThreadHandler(CThread *thread) +{ + if (thread) + return thread->Process(); + return NULL; +} + +bool CThread::StopThread(bool bWaitForExit /* = true */) +{ + m_bStop = true; + if (bWaitForExit) + pthread_join(m_thread, NULL); +} diff --git a/src/lib/util/threads.h b/src/lib/util/threads.h index 6f6288c..5c05f41 100644 --- a/src/lib/util/threads.h +++ b/src/lib/util/threads.h @@ -39,14 +39,13 @@ class CCondition { public: CCondition(void); - ~CCondition(void); + virtual ~CCondition(void); void Signal(void); bool Wait(CMutex *mutex, int64_t iTimeout); static void Sleep(int iTimeout); private: - bool m_bSignaled; pthread_cond_t m_cond; }; @@ -56,20 +55,17 @@ public: CMutex(void); virtual ~CMutex(void); - bool TryLock(int64_t iTimeout); + bool TryLock(void); bool Lock(void); void Unlock(void); - bool IsLocked(void) const { return m_bLocked; } pthread_mutex_t m_mutex; - CCondition *m_condition; - bool m_bLocked; }; class CLockObject { public: - CLockObject(CMutex *mutex, int64_t iTimeout = -1); + CLockObject(CMutex *mutex); ~CLockObject(void); bool IsLocked(void) const { return m_bLocked; } @@ -80,3 +76,22 @@ private: CMutex *m_mutex; bool m_bLocked; }; + +class CThread +{ +public: + CThread(void); + virtual ~CThread(void); + + virtual bool IsRunning(void) const { return m_bRunning; } + virtual bool CreateThread(void); + virtual bool StopThread(bool bWaitForExit = true); + + static void *ThreadHandler(CThread *thread); + virtual void *Process(void) = 0; + +protected: + pthread_t m_thread; + bool m_bRunning; + bool m_bStop; +}; -- 2.34.1