X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Flib%2Fadapter%2FPulse-Eight%2FUSBCECAdapterMessageQueue.cpp;h=35faeb71890b73892fb7a5c67b17b532b2a3d625;hb=496897540ebcc6fc13b346aa5e08f2148a2770c7;hp=4e035007e6423543f39e5158117b4e70b0b7a419;hpb=9f236526a89d6281101620a1c5a9cc1a6fe1aa03;p=deb_libcec.git diff --git a/src/lib/adapter/Pulse-Eight/USBCECAdapterMessageQueue.cpp b/src/lib/adapter/Pulse-Eight/USBCECAdapterMessageQueue.cpp index 4e03500..35faeb7 100644 --- a/src/lib/adapter/Pulse-Eight/USBCECAdapterMessageQueue.cpp +++ b/src/lib/adapter/Pulse-Eight/USBCECAdapterMessageQueue.cpp @@ -50,7 +50,8 @@ CCECAdapterMessageQueueEntry::CCECAdapterMessageQueueEntry(CCECAdapterMessageQue m_message(message), m_iPacketsLeft(message->IsTranmission() ? message->Size() / 4 : 1), m_bSucceeded(false), - m_bWaiting(true) {} + m_bWaiting(true), + m_queueTimeout(message->transmit_timeout) {} CCECAdapterMessageQueueEntry::~CCECAdapterMessageQueueEntry(void) { } @@ -133,6 +134,9 @@ bool CCECAdapterMessageQueueEntry::IsResponseOld(const CCECAdapterMessage &msg) bool CCECAdapterMessageQueueEntry::IsResponse(const CCECAdapterMessage &msg) { + if (m_message->state == ADAPTER_MESSAGE_STATE_SENT_ACKED) + return false; + cec_adapter_messagecode thisMsgCode = m_message->Message(); cec_adapter_messagecode msgCode = msg.Message(); cec_adapter_messagecode msgResponse = msg.ResponseTo(); @@ -199,12 +203,14 @@ bool CCECAdapterMessageQueueEntry::MessageReceivedCommandAccepted(const CCECAdap /* decrease by 1 */ m_iPacketsLeft--; +#ifdef CEC_DEBUGGING /* log this message */ CStdString strLog; strLog.Format("%s - command accepted", ToString()); if (m_iPacketsLeft > 0) strLog.AppendFormat(" - waiting for %d more", m_iPacketsLeft); m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, strLog); +#endif /* no more packets left and not a transmission, so we're done */ if (!m_message->IsTranmission() && m_iPacketsLeft == 0) @@ -230,7 +236,9 @@ bool CCECAdapterMessageQueueEntry::MessageReceivedTransmitSucceeded(const CCECAd if (m_iPacketsLeft == 0) { /* transmission succeeded, so we're done */ - m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "%s - transmit succeeded", ToString()); +#ifdef CEC_DEBUGGING + m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "%s - transmit succeeded", m_message->ToString().c_str()); +#endif m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED; m_message->response = message.packet; } @@ -252,7 +260,9 @@ bool CCECAdapterMessageQueueEntry::MessageReceivedResponse(const CCECAdapterMess { { CLockObject lock(m_mutex); +#ifdef CEC_DEBUGGING m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "%s - received response - %s", ToString(), message.ToString().c_str()); +#endif m_message->response = message.packet; if (m_message->IsTranmission()) m_message->state = message.Message() == MSGCODE_TRANSMIT_SUCCEEDED ? ADAPTER_MESSAGE_STATE_SENT_ACKED : ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED; @@ -270,6 +280,11 @@ bool CCECAdapterMessageQueueEntry::ProvidesExtendedResponse(void) return m_queue && m_queue->ProvidesExtendedResponse(); } +bool CCECAdapterMessageQueueEntry::TimedOutOrSucceeded(void) const +{ + return m_message->bFireAndForget && (m_bSucceeded || m_queueTimeout.TimeLeft() == 0); +} + CCECAdapterMessageQueue::CCECAdapterMessageQueue(CUSBCECAdapterCommunication *com) : PLATFORM::CThread(), m_com(com), @@ -315,10 +330,35 @@ void *CCECAdapterMessageQueue::Process(void) break; } } + + CheckTimedOutMessages(); } return NULL; } +void CCECAdapterMessageQueue::CheckTimedOutMessages(void) +{ + CLockObject lock(m_mutex); + vector timedOut; + for (map::iterator it = m_messages.begin(); it != m_messages.end(); it++) + { + if (it->second->TimedOutOrSucceeded()) + { + timedOut.push_back(it->first); + if (!it->second->m_bSucceeded) + m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(it->second->m_message->Message())); + delete it->second->m_message; + delete it->second; + } + } + + for (vector::iterator it = timedOut.begin(); it != timedOut.end(); it++) + { + uint64_t iEntryId = *it; + m_messages.erase(iEntryId); + } +} + void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg) { bool bHandled(false); @@ -331,7 +371,12 @@ void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg) { /* the message wasn't handled */ bool bIsError(m_com->HandlePoll(msg)); +#ifdef CEC_DEBUGGING m_com->m_callback->GetLib()->AddLog(bIsError ? CEC_LOG_WARNING : CEC_LOG_DEBUG, msg.ToString().c_str()); +#else + if (bIsError) + m_com->m_callback->GetLib()->AddLog(CEC_LOG_WARNING, msg.ToString().c_str()); +#endif /* push this message to the current frame */ if (!bIsError && msg.PushToCecCommand(m_currentCECFrame)) @@ -381,6 +426,13 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg) } CCECAdapterMessageQueueEntry *entry = new CCECAdapterMessageQueueEntry(this, msg); + if (!entry) + { + m_com->m_callback->GetLib()->AddLog(CEC_LOG_ERROR, "couldn't create queue entry for '%s'", CCECAdapterMessage::ToString(msg->Message())); + msg->state = ADAPTER_MESSAGE_STATE_ERROR; + return false; + } + uint64_t iEntryId(0); /* add to the wait for ack queue */ if (msg->Message() != MSGCODE_START_BOOTLOADER) @@ -394,7 +446,7 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg) m_writeQueue.Push(entry); bool bReturn(true); - if (entry) + if (!msg->bFireAndForget) { if (!entry->Wait(msg->transmit_timeout <= 5 ? CEC_DEFAULT_TRANSMIT_WAIT : msg->transmit_timeout)) {