X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Flib%2Fadapter%2FUSBCECAdapterMessageQueue.cpp;h=60625796c05f063fe58e6d295ec89a2402c7fbf1;hb=a615784075997a8a0926e6f35491dfa999b7a9c6;hp=e95a8a75921537889cd9d0c035fe2e775ad3f65c;hpb=a75e3a5a63546d6f7e670bc2a7a1931887a5d2a0;p=deb_libcec.git diff --git a/src/lib/adapter/USBCECAdapterMessageQueue.cpp b/src/lib/adapter/USBCECAdapterMessageQueue.cpp index e95a8a7..6062579 100644 --- a/src/lib/adapter/USBCECAdapterMessageQueue.cpp +++ b/src/lib/adapter/USBCECAdapterMessageQueue.cpp @@ -37,6 +37,7 @@ using namespace CEC; using namespace PLATFORM; +using namespace std; CCECAdapterMessageQueueEntry::CCECAdapterMessageQueueEntry(CCECAdapterMessage *message) : m_message(message), @@ -54,51 +55,34 @@ void CCECAdapterMessageQueueEntry::Broadcast(void) bool CCECAdapterMessageQueueEntry::MessageReceived(const CCECAdapterMessage &message) { - bool bSendSignal(false); bool bHandled(false); - PLATFORM::CLockObject lock(m_mutex); - if (!IsResponse(message)) + if (IsResponse(message)) { - /* we received a message from the adapter that's not a response to this command */ - if (!message.IsTranmission()) - { - /* we received something that's not a transmission while waiting for an ack to this command, so this command failed */ - - //TODO verify whether we're not failing too soon - CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - %s - not a response %s - failed", __FUNCTION__, ToString(), CCECAdapterMessage::ToString(message.Message())); - m_message->state = ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED; - bSendSignal = true; - } - } - else - { - /* we received a response, so this message is handled */ - bHandled = true; switch (message.Message()) { case MSGCODE_COMMAND_ACCEPTED: - bSendSignal = MessageReceivedCommandAccepted(message); + bHandled = MessageReceivedCommandAccepted(message); break; case MSGCODE_TRANSMIT_SUCCEEDED: - bSendSignal = MessageReceivedTransmitSucceeded(message); + bHandled = MessageReceivedTransmitSucceeded(message); break; default: - bSendSignal = MessageReceivedResponse(message); + bHandled = MessageReceivedResponse(message); break; } } - /* signal the waiting thread when we're done */ - if (bSendSignal) - { - m_bSucceeded = true; - m_condition.Signal(); - } - return bHandled; } +void CCECAdapterMessageQueueEntry::Signal(void) +{ + CLockObject lock(m_mutex); + m_bSucceeded = true; + m_condition.Signal(); +} + bool CCECAdapterMessageQueueEntry::Wait(uint32_t iTimeout) { bool bReturn(false); @@ -151,63 +135,77 @@ const char *CCECAdapterMessageQueueEntry::ToString(void) const bool CCECAdapterMessageQueueEntry::MessageReceivedCommandAccepted(const CCECAdapterMessage &message) { bool bSendSignal(false); - if (m_iPacketsLeft == 0) - { - /* we received a "command accepted", but we're not waiting for one anymore */ - CLibCEC::AddLog(CEC_LOG_ERROR, "%s - received unexpected 'command accepted' message", ToString()); - m_message->state = ADAPTER_MESSAGE_STATE_ERROR; - bSendSignal = true; - } - else + bool bHandled(false); { - /* decrease number of acks we're waiting on by 1 */ + CLockObject lock(m_mutex); if (m_iPacketsLeft > 0) + { + /* decrease by 1 */ m_iPacketsLeft--; - /* log this message */ - CStdString strLog; - strLog.Format("%s - command accepted", ToString()); - if (m_iPacketsLeft > 0) - strLog.AppendFormat(" - waiting for %d more", m_iPacketsLeft); - CLibCEC::AddLog(CEC_LOG_DEBUG, strLog); - - /* no more packets left and not a transmission, so we're done */ - if (!m_message->IsTranmission() && m_iPacketsLeft == 0) - { - m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED; - m_message->response = message.packet; - bSendSignal = true; + /* log this message */ + CStdString strLog; + strLog.Format("%s - command accepted", ToString()); + if (m_iPacketsLeft > 0) + strLog.AppendFormat(" - waiting for %d more", m_iPacketsLeft); + CLibCEC::AddLog(CEC_LOG_DEBUG, strLog); + + /* no more packets left and not a transmission, so we're done */ + if (!m_message->IsTranmission() && m_iPacketsLeft == 0) + { + m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED; + m_message->response = message.packet; + bSendSignal = true; + } + bHandled = true; } } - return bSendSignal; + + if (bSendSignal) + Signal(); + + return bHandled; } bool CCECAdapterMessageQueueEntry::MessageReceivedTransmitSucceeded(const CCECAdapterMessage &message) { - if (m_iPacketsLeft == 0) - { - /* transmission succeeded, so we're done */ - CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - transmit succeeded", ToString()); - m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED; - m_message->response = message.packet; - } - else { - /* error, we expected more acks */ - CLibCEC::AddLog(CEC_LOG_WARNING, "%s - received 'transmit succeeded' but not enough 'command accepted' messages (%d left)", ToString(), m_iPacketsLeft); - m_message->state = ADAPTER_MESSAGE_STATE_ERROR; + CLockObject lock(m_mutex); + if (m_iPacketsLeft == 0) + { + /* transmission succeeded, so we're done */ + CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - transmit succeeded", ToString()); + m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED; + m_message->response = message.packet; + } + else + { + /* error, we expected more acks + since the messages are processed in order, this should not happen, so this is an error situation */ + CLibCEC::AddLog(CEC_LOG_WARNING, "%s - received 'transmit succeeded' but not enough 'command accepted' messages (%d left)", ToString(), m_iPacketsLeft); + m_message->state = ADAPTER_MESSAGE_STATE_ERROR; + } } + + Signal(); + return true; } bool CCECAdapterMessageQueueEntry::MessageReceivedResponse(const CCECAdapterMessage &message) { - CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - received response", ToString()); - m_message->response = message.packet; - if (m_message->IsTranmission()) - m_message->state = message.Message() == MSGCODE_TRANSMIT_SUCCEEDED ? ADAPTER_MESSAGE_STATE_SENT_ACKED : ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED; - else - m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED; + { + CLockObject lock(m_mutex); + CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - received response '%s'", ToString(), CCECAdapterMessage::ToString(message.Message())); + m_message->response = message.packet; + if (m_message->IsTranmission()) + m_message->state = message.Message() == MSGCODE_TRANSMIT_SUCCEEDED ? ADAPTER_MESSAGE_STATE_SENT_ACKED : ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED; + else + m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED; + } + + Signal(); + return true; } @@ -220,20 +218,18 @@ CCECAdapterMessageQueue::~CCECAdapterMessageQueue(void) void CCECAdapterMessageQueue::Clear(void) { CLockObject lock(m_mutex); - CCECAdapterMessageQueueEntry *message(NULL); - while (m_messages.Pop(message)) - message->Broadcast(); + m_messages.clear(); } void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg) { + bool bHandled(false); CLockObject lock(m_mutex); - CCECAdapterMessageQueueEntry *message = GetNextQueuedEntry(); - - /* send the received message to the first entry in the queue */ - bool bHandled = message ? message->MessageReceived(msg) : false; + /* send the received message to each entry in the queue until it is handled */ + for (map::iterator it = m_messages.begin(); !bHandled && it != m_messages.end(); it++) + bHandled = it->second->MessageReceived(msg); - if (!message || !bHandled) + if (!bHandled) { /* the message wasn't handled */ bool bIsError(m_com->HandlePoll(msg)); @@ -283,19 +279,18 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg) /* set the correct line timeout */ if (msg->IsTranmission()) { - if (msg->tries == 1) - m_com->SetLineTimeout(msg->lineTimeout); - else - m_com->SetLineTimeout(msg->retryTimeout); + m_com->SetLineTimeout(msg->lineTimeout); } CCECAdapterMessageQueueEntry *entry(NULL); + uint64_t iEntryId(0); /* add to the wait for ack queue */ if (msg->Message() != MSGCODE_START_BOOTLOADER) { + CLockObject lock(m_mutex); entry = new CCECAdapterMessageQueueEntry(msg); - PLATFORM::CLockObject lock(m_mutex); - m_messages.Push(entry); + iEntryId = m_iNextMessage++; + m_messages.insert(make_pair(iEntryId, entry)); } /* TODO write the message async */ @@ -306,28 +301,20 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg) return false; } - if (entry && !entry->Wait(msg->transmit_timeout <= 5 ? CEC_DEFAULT_TRANSMIT_WAIT : msg->transmit_timeout)) + bool bReturn(true); + if (entry) { - CLibCEC::AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(msg->Message())); - msg->state = ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED; - return false; - } - - return true; -} - -CCECAdapterMessageQueueEntry *CCECAdapterMessageQueue::GetNextQueuedEntry(void) -{ - CCECAdapterMessageQueueEntry *message(NULL); - while (message == NULL && m_messages.Peek(message)) - { - if (!message->IsWaiting()) + if (!entry->Wait(msg->transmit_timeout <= 5 ? CEC_DEFAULT_TRANSMIT_WAIT : msg->transmit_timeout)) { - /* delete old messages */ - m_messages.Pop(message); - delete message; - message = NULL; + CLibCEC::AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(msg->Message())); + msg->state = ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED; + bReturn = false; } + + CLockObject lock(m_mutex); + m_messages.erase(iEntryId); + delete entry; } - return message; + + return bReturn; }