From daec0320b0ad9b3e9e84f681c7e968d808bafec8 Mon Sep 17 00:00:00 2001 From: Lars Op den Kamp Date: Wed, 3 Oct 2012 15:11:20 +0200 Subject: [PATCH] fixed - don't wait for a response when sending a reply, so we don't block message handling --- src/lib/CECProcessor.cpp | 4 +- .../Pulse-Eight/USBCECAdapterCommands.cpp | 2 +- .../USBCECAdapterCommunication.cpp | 21 +++++++--- .../Pulse-Eight/USBCECAdapterMessage.cpp | 1 + .../Pulse-Eight/USBCECAdapterMessage.h | 1 + .../Pulse-Eight/USBCECAdapterMessageQueue.cpp | 42 ++++++++++++++++++- .../Pulse-Eight/USBCECAdapterMessageQueue.h | 9 ++++ 7 files changed, 70 insertions(+), 10 deletions(-) diff --git a/src/lib/CECProcessor.cpp b/src/lib/CECProcessor.cpp index 9d49640..409b30b 100644 --- a/src/lib/CECProcessor.cpp +++ b/src/lib/CECProcessor.cpp @@ -437,7 +437,9 @@ bool CCECProcessor::Transmit(const cec_command &data, bool bIsReply) iLineTimeout = m_iRetryLineTimeout; } - return adapterState == ADAPTER_MESSAGE_STATE_SENT_ACKED; + return bIsReply ? + adapterState == ADAPTER_MESSAGE_STATE_SENT_ACKED || adapterState == ADAPTER_MESSAGE_STATE_SENT || adapterState == ADAPTER_MESSAGE_STATE_WAITING_TO_BE_SENT : + adapterState == ADAPTER_MESSAGE_STATE_SENT_ACKED; } void CCECProcessor::TransmitAbort(cec_logical_address source, cec_logical_address destination, cec_opcode opcode, cec_abort_reason reason /* = CEC_ABORT_REASON_UNRECOGNIZED_OPCODE */) diff --git a/src/lib/adapter/Pulse-Eight/USBCECAdapterCommands.cpp b/src/lib/adapter/Pulse-Eight/USBCECAdapterCommands.cpp index ab0fa39..cf97ea9 100644 --- a/src/lib/adapter/Pulse-Eight/USBCECAdapterCommands.cpp +++ b/src/lib/adapter/Pulse-Eight/USBCECAdapterCommands.cpp @@ -91,7 +91,7 @@ uint16_t CUSBCECAdapterCommands::RequestFirmwareVersion(void) m_persistedConfiguration.iFirmwareVersion = (response[0] << 8 | response[1]); else { - LIB_CEC->AddLog(CEC_LOG_WARNING, "the adapter did not respond with a correct firmware version (try %d)", iFwVersionTry); + LIB_CEC->AddLog(CEC_LOG_WARNING, "the adapter did not respond with a correct firmware version (try %d, size = %d)", iFwVersionTry, response.size); CEvent::Sleep(500); } } diff --git a/src/lib/adapter/Pulse-Eight/USBCECAdapterCommunication.cpp b/src/lib/adapter/Pulse-Eight/USBCECAdapterCommunication.cpp index ae4c8ac..a2c33e7 100644 --- a/src/lib/adapter/Pulse-Eight/USBCECAdapterCommunication.cpp +++ b/src/lib/adapter/Pulse-Eight/USBCECAdapterCommunication.cpp @@ -234,24 +234,33 @@ void CUSBCECAdapterCommunication::Close(void) m_port->Close(); } -cec_adapter_message_state CUSBCECAdapterCommunication::Write(const cec_command &data, bool &bRetry, uint8_t iLineTimeout, bool UNUSED(bIsReply)) +cec_adapter_message_state CUSBCECAdapterCommunication::Write(const cec_command &data, bool &bRetry, uint8_t iLineTimeout, bool bIsReply) { cec_adapter_message_state retVal(ADAPTER_MESSAGE_STATE_UNKNOWN); if (!IsRunning()) return retVal; CCECAdapterMessage *output = new CCECAdapterMessage(data, iLineTimeout); + output->bFireAndForget = bIsReply; /* mark as waiting for an ack from the destination */ MarkAsWaiting(data.destination); /* send the message */ - bRetry = (!m_adapterMessageQueue->Write(output) || output->NeedsRetry()) && output->transmit_timeout > 0; - if (bRetry) - Sleep(CEC_DEFAULT_TRANSMIT_RETRY_WAIT); - retVal = output->state; + if (bIsReply) + { + retVal = m_adapterMessageQueue->Write(output) ? + ADAPTER_MESSAGE_STATE_WAITING_TO_BE_SENT : ADAPTER_MESSAGE_STATE_ERROR; + } + else + { + bRetry = (!m_adapterMessageQueue->Write(output) || output->NeedsRetry()) && output->transmit_timeout > 0; + if (bRetry) + Sleep(CEC_DEFAULT_TRANSMIT_RETRY_WAIT); + retVal = output->state; - delete output; + delete output; + } return retVal; } diff --git a/src/lib/adapter/Pulse-Eight/USBCECAdapterMessage.cpp b/src/lib/adapter/Pulse-Eight/USBCECAdapterMessage.cpp index a5be412..f5d0735 100644 --- a/src/lib/adapter/Pulse-Eight/USBCECAdapterMessage.cpp +++ b/src/lib/adapter/Pulse-Eight/USBCECAdapterMessage.cpp @@ -266,6 +266,7 @@ void CCECAdapterMessage::Clear(void) packet.Clear(); lineTimeout = 3; bNextByteIsEscaped = false; + bFireAndForget = false; } void CCECAdapterMessage::Shift(uint8_t iShiftBy) diff --git a/src/lib/adapter/Pulse-Eight/USBCECAdapterMessage.h b/src/lib/adapter/Pulse-Eight/USBCECAdapterMessage.h index 7c8020d..89005e8 100644 --- a/src/lib/adapter/Pulse-Eight/USBCECAdapterMessage.h +++ b/src/lib/adapter/Pulse-Eight/USBCECAdapterMessage.h @@ -255,6 +255,7 @@ namespace CEC cec_adapter_message_state state; /**< the current state of this message */ int32_t transmit_timeout; /**< the timeout to use when sending this message */ uint8_t lineTimeout; /**< the default CEC line timeout to use when sending this message */ + bool bFireAndForget; /**< true to auto delete, don't wait for a response */ private: bool bNextByteIsEscaped; /**< true when the next byte that is added will be escaped, false otherwise */ diff --git a/src/lib/adapter/Pulse-Eight/USBCECAdapterMessageQueue.cpp b/src/lib/adapter/Pulse-Eight/USBCECAdapterMessageQueue.cpp index 4e03500..a751952 100644 --- a/src/lib/adapter/Pulse-Eight/USBCECAdapterMessageQueue.cpp +++ b/src/lib/adapter/Pulse-Eight/USBCECAdapterMessageQueue.cpp @@ -50,7 +50,8 @@ CCECAdapterMessageQueueEntry::CCECAdapterMessageQueueEntry(CCECAdapterMessageQue m_message(message), m_iPacketsLeft(message->IsTranmission() ? message->Size() / 4 : 1), m_bSucceeded(false), - m_bWaiting(true) {} + m_bWaiting(true), + m_queueTimeout(message->transmit_timeout) {} CCECAdapterMessageQueueEntry::~CCECAdapterMessageQueueEntry(void) { } @@ -270,6 +271,11 @@ bool CCECAdapterMessageQueueEntry::ProvidesExtendedResponse(void) return m_queue && m_queue->ProvidesExtendedResponse(); } +bool CCECAdapterMessageQueueEntry::TimedOutOrSucceeded(void) const +{ + return m_message->bFireAndForget && (m_bSucceeded || m_queueTimeout.TimeLeft() == 0); +} + CCECAdapterMessageQueue::CCECAdapterMessageQueue(CUSBCECAdapterCommunication *com) : PLATFORM::CThread(), m_com(com), @@ -315,10 +321,35 @@ void *CCECAdapterMessageQueue::Process(void) break; } } + + CheckTimedOutMessages(); } return NULL; } +void CCECAdapterMessageQueue::CheckTimedOutMessages(void) +{ + CLockObject lock(m_mutex); + vector timedOut; + for (map::iterator it = m_messages.begin(); it != m_messages.end(); it++) + { + if (it->second->TimedOutOrSucceeded()) + { + timedOut.push_back(it->first); + if (!it->second->m_bSucceeded) + m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(it->second->m_message->Message())); + delete it->second->m_message; + delete it->second; + } + } + + for (vector::iterator it = timedOut.begin(); it != timedOut.end(); it++) + { + uint64_t iEntryId = *it; + m_messages.erase(iEntryId); + } +} + void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg) { bool bHandled(false); @@ -381,6 +412,13 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg) } CCECAdapterMessageQueueEntry *entry = new CCECAdapterMessageQueueEntry(this, msg); + if (!entry) + { + m_com->m_callback->GetLib()->AddLog(CEC_LOG_ERROR, "couldn't create queue entry for '%s'", CCECAdapterMessage::ToString(msg->Message())); + msg->state = ADAPTER_MESSAGE_STATE_ERROR; + return false; + } + uint64_t iEntryId(0); /* add to the wait for ack queue */ if (msg->Message() != MSGCODE_START_BOOTLOADER) @@ -394,7 +432,7 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg) m_writeQueue.Push(entry); bool bReturn(true); - if (entry) + if (!msg->bFireAndForget) { if (!entry->Wait(msg->transmit_timeout <= 5 ? CEC_DEFAULT_TRANSMIT_WAIT : msg->transmit_timeout)) { diff --git a/src/lib/adapter/Pulse-Eight/USBCECAdapterMessageQueue.h b/src/lib/adapter/Pulse-Eight/USBCECAdapterMessageQueue.h index b81c908..7e01436 100644 --- a/src/lib/adapter/Pulse-Eight/USBCECAdapterMessageQueue.h +++ b/src/lib/adapter/Pulse-Eight/USBCECAdapterMessageQueue.h @@ -33,6 +33,7 @@ #include "lib/platform/threads/threads.h" #include "lib/platform/util/buffer.h" +#include "lib/platform/util/timeutils.h" #include #include "USBCECAdapterMessage.h" @@ -118,6 +119,11 @@ namespace CEC bool ProvidesExtendedResponse(void); + /*! + * @return True when a fire and forget packet timed out or succeeded, false otherwise + */ + bool TimedOutOrSucceeded(void) const; + CCECAdapterMessageQueue * m_queue; CCECAdapterMessage * m_message; /**< the message that was sent */ uint8_t m_iPacketsLeft; /**< the amount of acks that we're waiting on */ @@ -125,6 +131,7 @@ namespace CEC bool m_bWaiting; /**< true while a thread is waiting or when it hasn't started waiting yet */ PLATFORM::CCondition m_condition; /**< the condition to wait on */ PLATFORM::CMutex m_mutex; /**< mutex for changes to this class */ + PLATFORM::CTimeout m_queueTimeout; /**< ack timeout for fire and forget commands */ }; class CCECAdapterMessageQueue : public PLATFORM::CThread @@ -170,6 +177,8 @@ namespace CEC virtual void *Process(void); + void CheckTimedOutMessages(void); + private: CUSBCECAdapterCommunication * m_com; /**< the communication handler */ PLATFORM::CMutex m_mutex; /**< mutex for changes to this class */ -- 2.34.1