From 960f33c651b2dd1e6331dafe5b21705c11cee1a2 Mon Sep 17 00:00:00 2001 From: Lars Op den Kamp Date: Sat, 11 Feb 2012 19:50:50 +0100 Subject: [PATCH] cec: and now proper predicates --- src/cec-config/cec-config.cpp | 15 +-- .../adapter/USBCECAdapterCommunication.cpp | 14 +- src/lib/adapter/USBCECAdapterCommunication.h | 3 +- src/lib/adapter/USBCECAdapterMessage.h | 23 ++-- src/lib/implementations/CECCommandHandler.cpp | 10 +- src/lib/implementations/CECCommandHandler.h | 25 ++-- src/lib/platform/posix/os-threads.h | 17 ++- src/lib/platform/sockets/socket.h | 28 ++-- src/lib/platform/threads/mutex.h | 123 +++++++++++++++--- src/lib/platform/threads/threads.h | 39 +++--- src/lib/platform/util/timeutils.h | 16 +++ src/testclient/main.cpp | 2 +- 12 files changed, 215 insertions(+), 100 deletions(-) diff --git a/src/cec-config/cec-config.cpp b/src/cec-config/cec-config.cpp index a853f91..3fc7af3 100644 --- a/src/cec-config/cec-config.cpp +++ b/src/cec-config/cec-config.cpp @@ -50,12 +50,10 @@ using namespace PLATFORM; CMutex g_outputMutex; -CMutex g_responseMutex; -CCondition g_responseCondtion; +CEvent g_responseEvent; cec_opcode g_lastCommand = CEC_OPCODE_NONE; -CMutex g_keyMutex; -CCondition g_keyCondtion; +CEvent g_keyEvent; cec_user_control_code g_lastKey = CEC_USER_CONTROL_CODE_UNKNOWN; ICECCallbacks g_callbacks; @@ -129,17 +127,15 @@ int CecLogMessage(void *UNUSED(cbParam), const cec_log_message &message) int CecKeyPress(void *UNUSED(cbParam), const cec_keypress &key) { - CLockObject lock(g_keyMutex); g_lastKey = key.keycode; - g_keyCondtion.Signal(); + g_keyEvent.Signal(); return 0; } int CecCommand(void *UNUSED(cbParam), const cec_command &command) { - CLockObject lock(g_responseMutex); g_lastCommand = command.opcode; - g_responseCondtion.Signal(); + g_responseEvent.Signal(); return 0; } @@ -326,8 +322,7 @@ bool PowerOnTV(uint64_t iTimeout = 60000) g_parser->PowerOnDevices(CECDEVICE_TV); while (iTarget > iNow) { - CLockObject lock(g_responseMutex); - g_responseCondtion.Wait(g_responseMutex, (uint32_t)(iTarget - iNow)); + g_responseEvent.Wait((uint32_t)(iTarget - iNow)); if (g_lastCommand == CEC_OPCODE_REQUEST_ACTIVE_SOURCE) break; iNow = GetTimeMs(); diff --git a/src/lib/adapter/USBCECAdapterCommunication.cpp b/src/lib/adapter/USBCECAdapterCommunication.cpp index 32158de..8e0b44e 100644 --- a/src/lib/adapter/USBCECAdapterCommunication.cpp +++ b/src/lib/adapter/USBCECAdapterCommunication.cpp @@ -43,6 +43,7 @@ using namespace PLATFORM; CUSBCECAdapterCommunication::CUSBCECAdapterCommunication(CCECProcessor *processor, const char *strPort, uint16_t iBaudRate /* = 38400 */) : m_port(NULL), m_processor(processor), + m_bHasData(false), m_iLineTimeout(0), m_iFirmwareVersion(CEC_FW_VERSION_UNKNOWN), m_lastInitiator(CECDEVICE_UNKNOWN), @@ -174,6 +175,7 @@ void CUSBCECAdapterCommunication::Close(void) { SetAckMask(0); CLockObject lock(m_mutex); + m_bHasData = true; m_rcvCondition.Broadcast(); StopThread(); } @@ -203,7 +205,7 @@ void *CUSBCECAdapterCommunication::Process(void) CCECAdapterMessage *msg(NULL); if (m_outBuffer.Pop(msg)) - msg->condition.Broadcast(); + msg->event.Broadcast(); if (m_port) { @@ -247,10 +249,9 @@ cec_adapter_message_state CUSBCECAdapterCommunication::Write(const cec_command & bool CUSBCECAdapterCommunication::Write(CCECAdapterMessage *data) { - CLockObject lock(data->mutex); data->state = ADAPTER_MESSAGE_STATE_WAITING_TO_BE_SENT; m_outBuffer.Push(data); - data->condition.Wait(data->mutex); + data->event.Wait(); if ((data->expectControllerAck && data->state != ADAPTER_MESSAGE_STATE_SENT_ACKED) || (!data->expectControllerAck && data->state != ADAPTER_MESSAGE_STATE_SENT)) @@ -289,9 +290,10 @@ bool CUSBCECAdapterCommunication::Read(CCECAdapterMessage &msg, uint32_t iTimeou if (!m_inBuffer.Pop(buf)) { - if (iTimeout == 0 || !m_rcvCondition.Wait(m_mutex, iTimeout)) + if (iTimeout == 0 || !m_rcvCondition.Wait(m_mutex, m_bHasData, iTimeout)) return false; m_inBuffer.Pop(buf); + m_bHasData = m_inBuffer.Size() > 0; } if (buf) @@ -626,6 +628,7 @@ void CUSBCECAdapterCommunication::AddData(uint8_t *data, size_t iLen) m_currentAdapterMessage.Clear(); m_bGotStart = false; m_bNextIsEscaped = false; + m_bHasData = true; m_rcvCondition.Signal(); } else if (m_bNextIsEscaped) @@ -671,7 +674,6 @@ bool CUSBCECAdapterCommunication::ReadFromDevice(uint32_t iTimeout, size_t iSize void CUSBCECAdapterCommunication::SendMessageToAdapter(CCECAdapterMessage *msg) { CLockObject adapterLock(m_mutex); - CLockObject lock(msg->mutex); if (msg->tries == 1) SetLineTimeout(msg->lineTimeout); else @@ -693,7 +695,7 @@ void CUSBCECAdapterCommunication::SendMessageToAdapter(CCECAdapterMessage *msg) CLibCEC::AddLog(CEC_LOG_DEBUG, "did not receive ack"); } } - msg->condition.Signal(); + msg->event.Signal(); } void CUSBCECAdapterCommunication::WriteNextCommand(void) diff --git a/src/lib/adapter/USBCECAdapterCommunication.h b/src/lib/adapter/USBCECAdapterCommunication.h index 659e659..d2de30e 100644 --- a/src/lib/adapter/USBCECAdapterCommunication.h +++ b/src/lib/adapter/USBCECAdapterCommunication.h @@ -85,7 +85,8 @@ namespace CEC PLATFORM::SyncedBuffer m_inBuffer; PLATFORM::SyncedBuffer m_outBuffer; PLATFORM::CMutex m_mutex; - PLATFORM::CCondition m_rcvCondition; + PLATFORM::CCondition m_rcvCondition; + volatile bool m_bHasData; uint8_t m_iLineTimeout; uint16_t m_iFirmwareVersion; cec_command m_currentframe; diff --git a/src/lib/adapter/USBCECAdapterMessage.h b/src/lib/adapter/USBCECAdapterMessage.h index ae41177..d3f5d3e 100644 --- a/src/lib/adapter/USBCECAdapterMessage.h +++ b/src/lib/adapter/USBCECAdapterMessage.h @@ -346,17 +346,16 @@ namespace CEC CECDEVICE_UNKNOWN; } - uint8_t maxTries; - uint8_t tries; - cec_adapter_messagecode reply; - cec_datapacket packet; - cec_adapter_message_state state; - int32_t transmit_timeout; - bool isTransmission; - bool expectControllerAck; - uint8_t lineTimeout; - uint8_t retryTimeout; - PLATFORM::CMutex mutex; - PLATFORM::CCondition condition; + uint8_t maxTries; + uint8_t tries; + cec_adapter_messagecode reply; + cec_datapacket packet; + cec_adapter_message_state state; + int32_t transmit_timeout; + bool isTransmission; + bool expectControllerAck; + uint8_t lineTimeout; + uint8_t retryTimeout; + PLATFORM::CEvent event; }; } diff --git a/src/lib/implementations/CECCommandHandler.cpp b/src/lib/implementations/CECCommandHandler.cpp index 0f00627..44cf477 100644 --- a/src/lib/implementations/CECCommandHandler.cpp +++ b/src/lib/implementations/CECCommandHandler.cpp @@ -51,7 +51,8 @@ CCECCommandHandler::CCECCommandHandler(CCECBusDevice *busDevice) : m_iUseCounter(0), m_expectedResponse(CEC_OPCODE_NONE), m_bOPTSendDeckStatusUpdateOnActiveSource(false), - m_vendorId(CEC_VENDOR_UNKNOWN) + m_vendorId(CEC_VENDOR_UNKNOWN), + m_bRcvSignal(false) { } @@ -195,7 +196,10 @@ bool CCECCommandHandler::HandleCommand(const cec_command &command) CLockObject lock(m_receiveMutex); if (m_expectedResponse == CEC_OPCODE_NONE || m_expectedResponse == command.opcode) + { + m_bRcvSignal = true; m_condition.Signal(); + } } MarkReady(); @@ -962,7 +966,9 @@ bool CCECCommandHandler::Transmit(cec_command &command, bool bExpectResponse /* { CLibCEC::AddLog(CEC_LOG_DEBUG, "command transmitted"); if (bExpectResponse) - bReturn = m_condition.Wait(m_receiveMutex, m_iTransmitWait); + bReturn = m_condition.Wait(m_receiveMutex, m_bRcvSignal, m_iTransmitWait); + if (bReturn) + m_bRcvSignal = false; } } --m_iUseCounter; diff --git a/src/lib/implementations/CECCommandHandler.h b/src/lib/implementations/CECCommandHandler.h index be553a9..d3cbe3e 100644 --- a/src/lib/implementations/CECCommandHandler.h +++ b/src/lib/implementations/CECCommandHandler.h @@ -134,17 +134,18 @@ namespace CEC virtual bool Transmit(cec_command &command, bool bExpectResponse = true, cec_opcode expectedResponse = CEC_OPCODE_NONE); - CCECBusDevice * m_busDevice; - CCECProcessor * m_processor; - int32_t m_iTransmitTimeout; - int32_t m_iTransmitWait; - int8_t m_iTransmitRetries; - bool m_bHandlerInited; - uint8_t m_iUseCounter; - cec_opcode m_expectedResponse; - bool m_bOPTSendDeckStatusUpdateOnActiveSource; - cec_vendor_id m_vendorId; - PLATFORM::CMutex m_receiveMutex; - PLATFORM::CCondition m_condition; + CCECBusDevice * m_busDevice; + CCECProcessor * m_processor; + int32_t m_iTransmitTimeout; + int32_t m_iTransmitWait; + int8_t m_iTransmitRetries; + bool m_bHandlerInited; + uint8_t m_iUseCounter; + cec_opcode m_expectedResponse; + bool m_bOPTSendDeckStatusUpdateOnActiveSource; + cec_vendor_id m_vendorId; + PLATFORM::CMutex m_receiveMutex; + PLATFORM::CCondition m_condition; + volatile bool m_bRcvSignal; }; }; diff --git a/src/lib/platform/posix/os-threads.h b/src/lib/platform/posix/os-threads.h index 48e1e35..5216ffe 100644 --- a/src/lib/platform/posix/os-threads.h +++ b/src/lib/platform/posix/os-threads.h @@ -92,17 +92,22 @@ namespace PLATFORM pthread_cond_broadcast(&m_condition); } - bool Wait(mutex_t &mutex, uint32_t iTimeoutMs) + bool Wait(mutex_t &mutex) { sched_yield(); - if (iTimeoutMs > 0) - { - struct timespec timeout = GetAbsTime(iTimeoutMs); - return (pthread_cond_timedwait(&m_condition, &mutex, &timeout) == 0); - } return (pthread_cond_wait(&m_condition, &mutex) == 0); } + bool Wait(mutex_t &mutex, uint32_t iTimeoutMs) + { + if (iTimeoutMs == 0) + return Wait(mutex); + + sched_yield(); + struct timespec timeout = GetAbsTime(iTimeoutMs); + return (pthread_cond_timedwait(&m_condition, &mutex, &timeout) == 0); + } + pthread_cond_t m_condition; }; } diff --git a/src/lib/platform/sockets/socket.h b/src/lib/platform/sockets/socket.h index 58a3e3a..369fbe4 100644 --- a/src/lib/platform/sockets/socket.h +++ b/src/lib/platform/sockets/socket.h @@ -105,7 +105,7 @@ namespace PLATFORM public: CProtectedSocket(_Socket *socket) : m_socket(socket), - m_iUseCount(0) {} + m_bIsIdle(true) {} virtual ~CProtectedSocket(void) { @@ -151,13 +151,13 @@ namespace PLATFORM virtual bool IsBusy(void) { CLockObject lock(m_mutex); - return m_socket && m_iUseCount > 0; + return m_socket && !m_bIsIdle; } - virtual int GetUseCount(void) + virtual bool IsIdle(void) { CLockObject lock(m_mutex); - return m_iUseCount; + return m_socket && m_bIsIdle; } virtual ssize_t Write(void* data, size_t len) @@ -208,27 +208,21 @@ namespace PLATFORM bool WaitReady(void) { CLockObject lock(m_mutex); - if (m_iUseCount > 0) - m_condition.Wait(m_mutex); - - if (m_iUseCount > 0) - return false; - - ++m_iUseCount; + m_condition.Wait(m_mutex, m_bIsIdle); + m_bIsIdle = false; return true; } void MarkReady(void) { CLockObject lock(m_mutex); - if (m_iUseCount > 0) - --m_iUseCount; + m_bIsIdle = true; m_condition.Signal(); } - _Socket *m_socket; - CMutex m_mutex; - CCondition m_condition; - int m_iUseCount; + _Socket * m_socket; + CMutex m_mutex; + CCondition m_condition; + bool m_bIsIdle; }; }; diff --git a/src/lib/platform/threads/mutex.h b/src/lib/platform/threads/mutex.h index e848101..11e3606 100644 --- a/src/lib/platform/threads/mutex.h +++ b/src/lib/platform/threads/mutex.h @@ -39,6 +39,8 @@ #include "../posix/os-threads.h" #endif +#include "../util/timeutils.h" + namespace PLATFORM { class PreventCopy @@ -52,11 +54,13 @@ namespace PLATFORM inline PreventCopy &operator=(const PreventCopy &c){ *this = c; return *this; } }; - class CCondition; + template + class CCondition; class CMutex : public PreventCopy { - friend class CCondition; + template + friend class CCondition; public: inline CMutex(void) : m_iLockCount(0) @@ -221,39 +225,126 @@ namespace PLATFORM volatile bool m_bIsLocked; }; - class CCondition : public PreventCopy + template + class CCondition : public PreventCopy + { + public: + inline CCondition(void) {} + inline ~CCondition(void) + { + m_condition.Broadcast(); + } + + inline void Broadcast(void) + { + m_condition.Broadcast(); + } + + inline void Signal(void) + { + m_condition.Signal(); + } + + inline bool Wait(CMutex &mutex, _Predicate &predicate) + { + while(!predicate) + m_condition.Wait(mutex.m_mutex); + return true; + } + + inline bool Wait(CMutex &mutex, _Predicate &predicate, uint32_t iTimeout) + { + if (iTimeout == 0) + return Wait(mutex, predicate); + + bool bReturn(true); + if (!predicate) + { + CTimeout timeout(iTimeout); + uint64_t iMsLeft(0); + bReturn = false; + while (!bReturn) + { + iMsLeft = timeout.TimeLeft(); + if ((bReturn = iMsLeft == 0 || predicate) == false) + m_condition.Wait(mutex.m_mutex, iMsLeft); + } + } + return bReturn; + } + + private: + CConditionImpl m_condition; + }; + + class CEvent { public: - inline CCondition(void) {} - inline ~CCondition(void) + CEvent(void) : + m_bSignaled(false), + m_bBroadcast(false), + m_iWaitingThreads(0) {} + virtual ~CEvent(void) {} + + void Broadcast(void) { + Set(true); m_condition.Broadcast(); } - inline void Broadcast(void) + void Signal(void) { - m_condition.Broadcast(); + Set(false); + m_condition.Signal(); } - inline void Signal(void) + bool Wait(void) { - m_condition.Signal(); + CLockObject lock(m_mutex); + ++m_iWaitingThreads; + + bool bReturn = m_condition.Wait(m_mutex, m_bSignaled); + return ResetAndReturn() && bReturn; } - inline bool Wait(CMutex &mutex, uint32_t iTimeout = 0) + bool Wait(uint32_t iTimeout) { - return m_condition.Wait(mutex.m_mutex, iTimeout); + if (iTimeout == 0) + return Wait(); + + CLockObject lock(m_mutex); + ++m_iWaitingThreads; + bool bReturn = m_condition.Wait(m_mutex, m_bSignaled, iTimeout); + return ResetAndReturn() && bReturn; } static void Sleep(uint32_t iTimeout) { - CCondition w; - CMutex m; - CLockObject lock(m); - w.Wait(m, iTimeout); + CEvent event; + event.Wait(iTimeout); } private: - CConditionImpl m_condition; + void Set(bool bBroadcast = false) + { + CLockObject lock(m_mutex); + m_bSignaled = true; + m_bBroadcast = bBroadcast; + } + + bool ResetAndReturn(void) + { + CLockObject lock(m_mutex); + bool bReturn(m_bSignaled); + if (bReturn && (--m_iWaitingThreads == 0 || !m_bBroadcast)) + m_bSignaled = false; + return bReturn; + } + + volatile bool m_bSignaled; + CCondition m_condition; + CMutex m_mutex; + volatile bool m_bBroadcast; + unsigned int m_iWaitingThreads; }; } diff --git a/src/lib/platform/threads/threads.h b/src/lib/platform/threads/threads.h index 8fb71fc..f2de310 100644 --- a/src/lib/platform/threads/threads.h +++ b/src/lib/platform/threads/threads.h @@ -40,7 +40,8 @@ namespace PLATFORM public: CThread(void) : m_bStop(false), - m_bRunning(false) {} + m_bRunning(false), + m_bStopped(false) {} virtual ~CThread(void) { @@ -53,17 +54,21 @@ namespace PLATFORM if (thread) { - CLockObject lock(thread->m_threadMutex); - thread->m_bRunning = true; - lock.Unlock(); - thread->m_threadCondition.Broadcast(); + { + CLockObject lock(thread->m_threadMutex); + thread->m_bRunning = true; + thread->m_bStopped = false; + thread->m_threadCondition.Broadcast(); + } retVal = thread->Process(); - lock.Lock(); - thread->m_bRunning = false; - lock.Unlock(); - thread->m_threadCondition.Broadcast(); + { + CLockObject lock(thread->m_threadMutex); + thread->m_bRunning = false; + thread->m_bStopped = true; + thread->m_threadCondition.Broadcast(); + } } return retVal; @@ -91,7 +96,7 @@ namespace PLATFORM if (ThreadsCreate(m_thread, CThread::ThreadHandler, ((void*)static_cast(this)))) { if (bWait) - m_threadCondition.Wait(m_threadMutex); + m_threadCondition.Wait(m_threadMutex, m_bRunning); bReturn = true; } } @@ -106,7 +111,6 @@ namespace PLATFORM CLockObject lock(m_threadMutex); bRunning = IsRunning(); m_bStop = true; - m_threadCondition.Broadcast(); } if (bRunning && bWaitForExit) @@ -120,7 +124,7 @@ namespace PLATFORM virtual bool Sleep(uint32_t iTimeout) { CLockObject lock(m_threadMutex); - return m_bStop ? false : m_threadCondition.Wait(m_threadMutex, iTimeout); + return m_bStop ? false : m_threadCondition.Wait(m_threadMutex, m_bStopped, iTimeout); } virtual void *Process(void) = 0; @@ -129,10 +133,11 @@ namespace PLATFORM void SetRunning(bool bSetTo); private: - bool m_bStop; - bool m_bRunning; - CCondition m_threadCondition; - CMutex m_threadMutex; - thread_t m_thread; + bool m_bStop; + bool m_bRunning; + bool m_bStopped; + CCondition m_threadCondition; + CMutex m_threadMutex; + thread_t m_thread; }; }; diff --git a/src/lib/platform/util/timeutils.h b/src/lib/platform/util/timeutils.h index d2992e7..367c80b 100644 --- a/src/lib/platform/util/timeutils.h +++ b/src/lib/platform/util/timeutils.h @@ -103,4 +103,20 @@ namespace PLATFORM { return (T)GetTimeMs() / (T)1000.0; } + + class CTimeout + { + public: + CTimeout(uint32_t iTime) : + m_iTarget(GetTimeMs() + iTime) {} + + uint64_t TimeLeft(void) const + { + uint64_t iNow = GetTimeMs(); + return (iNow > m_iTarget) ? 0 : m_iTarget - iNow; + } + + private: + uint64_t m_iTarget; + }; }; diff --git a/src/testclient/main.cpp b/src/testclient/main.cpp index dde9980..7517249 100644 --- a/src/testclient/main.cpp +++ b/src/testclient/main.cpp @@ -1127,7 +1127,7 @@ int main (int argc, char *argv[]) g_bExit = true; if (!g_bExit && !g_bHardExit) - CCondition::Sleep(50); + CEvent::Sleep(50); } if (!g_bSingleCommand && !g_bHardExit) -- 2.34.1