From a8559e01ffc0239556097fbbe99f88f663f3fd6f Mon Sep 17 00:00:00 2001 From: Lars Op den Kamp Date: Tue, 24 Apr 2012 21:43:08 +0200 Subject: [PATCH] cec: async writes for CUSBCECAdapterCommunication. less locks and polls, more speed --- src/lib/LibCEC.cpp | 2 +- src/lib/LibCEC.h | 1 + .../adapter/USBCECAdapterCommunication.cpp | 22 +++++++++---- src/lib/adapter/USBCECAdapterMessageQueue.cpp | 33 +++++++++++++++---- src/lib/adapter/USBCECAdapterMessageQueue.h | 8 +++-- src/lib/platform/util/buffer.h | 20 +++++++++-- 6 files changed, 67 insertions(+), 19 deletions(-) diff --git a/src/lib/LibCEC.cpp b/src/lib/LibCEC.cpp index 585091e..a59fae7 100644 --- a/src/lib/LibCEC.cpp +++ b/src/lib/LibCEC.cpp @@ -354,7 +354,7 @@ void CLibCEC::AddLog(const cec_log_level level, const char *strFormat, ...) CLibCEC *instance = CLibCEC::GetInstance(); if (!instance) return; - CLockObject lock(instance->m_mutex); + CLockObject lock(instance->m_logMutex); cec_log_message message; message.level = level; diff --git a/src/lib/LibCEC.h b/src/lib/LibCEC.h index adfb507..e459800 100644 --- a/src/lib/LibCEC.h +++ b/src/lib/LibCEC.h @@ -154,5 +154,6 @@ namespace CEC ICECCallbacks * m_callbacks; void * m_cbParam; PLATFORM::CMutex m_mutex; + PLATFORM::CMutex m_logMutex; }; }; diff --git a/src/lib/adapter/USBCECAdapterCommunication.cpp b/src/lib/adapter/USBCECAdapterCommunication.cpp index b53114c..828ea24 100644 --- a/src/lib/adapter/USBCECAdapterCommunication.cpp +++ b/src/lib/adapter/USBCECAdapterCommunication.cpp @@ -92,7 +92,10 @@ bool CUSBCECAdapterCommunication::Open(uint32_t iTimeoutMs /* = 10000 */, bool b m_commands = new CUSBCECAdapterCommands(this); if (!m_adapterMessageQueue) + { m_adapterMessageQueue = new CCECAdapterMessageQueue(this); + m_adapterMessageQueue->CreateThread(); + } /* try to open the connection */ CStdString strError; @@ -166,6 +169,8 @@ void CUSBCECAdapterCommunication::Close(void) SetControlledMode(false); } + m_adapterMessageQueue->Clear(); + /* stop and delete the ping thread */ if (m_pingThread) m_pingThread->StopThread(0); @@ -193,9 +198,7 @@ cec_adapter_message_state CUSBCECAdapterCommunication::Write(const cec_command & /* send the message */ bRetry = (!m_adapterMessageQueue->Write(output) || output->NeedsRetry()) && output->transmit_timeout > 0; - if (output->state == ADAPTER_MESSAGE_STATE_ERROR) - Close(); - else if (bRetry) + if (bRetry) Sleep(CEC_DEFAULT_TRANSMIT_RETRY_WAIT); retVal = output->state; @@ -312,6 +315,7 @@ bool CUSBCECAdapterCommunication::WriteToDevice(CCECAdapterMessage *message) { CLibCEC::AddLog(CEC_LOG_DEBUG, "error writing command '%s' to serial port '%s': %s", CCECAdapterMessage::ToString(message->Message()), m_port->GetName().c_str(), m_port->GetError().c_str()); message->state = ADAPTER_MESSAGE_STATE_ERROR; + Close(); return false; } @@ -332,15 +336,19 @@ bool CUSBCECAdapterCommunication::ReadFromDevice(uint32_t iTimeout, size_t iSize CLockObject lock(m_mutex); if (!m_port || !m_port->IsOpen()) return false; + iBytesRead = m_port->Read(buff, sizeof(uint8_t) * iSize, iTimeout); + + if (m_port->GetErrorNumber()) + { + CLibCEC::AddLog(CEC_LOG_ERROR, "error reading from serial port: %s", m_port->GetError().c_str()); + m_port->Close(); + return false; + } } if (iBytesRead < 0 || iBytesRead > 256) - { - CLibCEC::AddLog(CEC_LOG_ERROR, "error reading from serial port: %s", m_port->GetError().c_str()); - Close(); return false; - } else if (iBytesRead > 0) { /* add the data to the current frame */ diff --git a/src/lib/adapter/USBCECAdapterMessageQueue.cpp b/src/lib/adapter/USBCECAdapterMessageQueue.cpp index 56c81a6..64f9f00 100644 --- a/src/lib/adapter/USBCECAdapterMessageQueue.cpp +++ b/src/lib/adapter/USBCECAdapterMessageQueue.cpp @@ -213,14 +213,38 @@ bool CCECAdapterMessageQueueEntry::MessageReceivedResponse(const CCECAdapterMess CCECAdapterMessageQueue::~CCECAdapterMessageQueue(void) { Clear(); + StopThread(0); } void CCECAdapterMessageQueue::Clear(void) { + StopThread(5); CLockObject lock(m_mutex); + m_writeQueue.Clear(); m_messages.clear(); } +void *CCECAdapterMessageQueue::Process(void) +{ + CCECAdapterMessageQueueEntry *message(NULL); + while (!IsStopped()) + { + /* wait for a new message */ + if (m_writeQueue.Pop(message, 1000)) + { + /* write this message */ + m_com->WriteToDevice(message->m_message); + if (message->m_message->state == ADAPTER_MESSAGE_STATE_ERROR) + { + message->Signal(); + Clear(); + break; + } + } + } + return NULL; +} + void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg) { bool bHandled(false); @@ -293,13 +317,8 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg) m_messages.insert(make_pair(iEntryId, entry)); } - /* TODO write the message async */ - if (!m_com->WriteToDevice(msg)) - { - /* error! */ - Clear(); - return false; - } + /* add the message to the write queue */ + m_writeQueue.Push(entry); bool bReturn(true); if (entry) diff --git a/src/lib/adapter/USBCECAdapterMessageQueue.h b/src/lib/adapter/USBCECAdapterMessageQueue.h index 9c5be49..e476e26 100644 --- a/src/lib/adapter/USBCECAdapterMessageQueue.h +++ b/src/lib/adapter/USBCECAdapterMessageQueue.h @@ -32,6 +32,7 @@ */ #include "USBCECAdapterMessage.h" +#include "../platform/threads/threads.h" #include namespace CEC @@ -85,7 +86,6 @@ namespace CEC */ const char *ToString(void) const; - private: /*! * @brief Called when a 'command accepted' message was received. * @param message The message that was received. @@ -120,7 +120,7 @@ namespace CEC PLATFORM::CMutex m_mutex; /**< mutex for changes to this class */ }; - class CCECAdapterMessageQueue + class CCECAdapterMessageQueue : public PLATFORM::CThread { friend class CUSBCECAdapterCommunication; @@ -131,6 +131,7 @@ namespace CEC * @param iQueueSize The outgoing message queue size. */ CCECAdapterMessageQueue(CUSBCECAdapterCommunication *com) : + PLATFORM::CThread(), m_com(com), m_iNextMessage(0) {} virtual ~CCECAdapterMessageQueue(void); @@ -160,10 +161,13 @@ namespace CEC */ bool Write(CCECAdapterMessage *msg); + virtual void *Process(void); + private: CUSBCECAdapterCommunication * m_com; /**< the communication handler */ PLATFORM::CMutex m_mutex; /**< mutex for changes to this class */ std::map m_messages; /**< the outgoing message queue */ + PLATFORM::SyncedBuffer m_writeQueue; /**< the queue for messages that are to be written */ uint64_t m_iNextMessage; /**< the index of the next message */ CCECAdapterMessage m_incomingAdapterMessage; /**< the current incoming message that's being assembled */ cec_command m_currentCECFrame; /**< the current incoming CEC command that's being assembled */ diff --git a/src/lib/platform/util/buffer.h b/src/lib/platform/util/buffer.h index 8777661..aa658ee 100644 --- a/src/lib/platform/util/buffer.h +++ b/src/lib/platform/util/buffer.h @@ -41,7 +41,8 @@ namespace PLATFORM { public: SyncedBuffer(size_t iMaxSize = 100) : - m_maxSize(iMaxSize) {} + m_maxSize(iMaxSize), + m_bHasMessages(false) {} virtual ~SyncedBuffer(void) { @@ -53,6 +54,7 @@ namespace PLATFORM CLockObject lock(m_mutex); while (!m_buffer.empty()) m_buffer.pop(); + m_condition.Broadcast(); } size_t Size(void) @@ -74,17 +76,29 @@ namespace PLATFORM return false; m_buffer.push(entry); + m_bHasMessages = true; + m_condition.Signal(); return true; } - bool Pop(_BType &entry) + bool Pop(_BType &entry, uint32_t iTimeoutMs = 0) { bool bReturn(false); CLockObject lock(m_mutex); + + // wait for a signal if the buffer is empty + if (m_buffer.empty() && iTimeoutMs > 0) + { + if (!m_condition.Wait(m_mutex, m_bHasMessages, iTimeoutMs)) + return bReturn; + } + + // pop the first item if (!m_buffer.empty()) { entry = m_buffer.front(); m_buffer.pop(); + m_bHasMessages = !m_buffer.empty(); bReturn = true; } return bReturn; @@ -106,5 +120,7 @@ namespace PLATFORM size_t m_maxSize; std::queue<_BType> m_buffer; CMutex m_mutex; + CCondition m_condition; + bool m_bHasMessages; }; }; -- 2.34.1