X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Flib%2Fadapter%2FPulse-Eight%2FUSBCECAdapterMessageQueue.cpp;h=1b14f445f30e9bfc43afbef5304296d3277cb619;hb=95587b956c69ead0b46d301f5ca70e977890c3fd;hp=c713eeac5d3e050b80de01ed99a7214559bbc2dc;hpb=2b44051cbfa70deafc30d9767323214debbc1a75;p=deb_libcec.git diff --git a/src/lib/adapter/Pulse-Eight/USBCECAdapterMessageQueue.cpp b/src/lib/adapter/Pulse-Eight/USBCECAdapterMessageQueue.cpp index c713eea..1b14f44 100644 --- a/src/lib/adapter/Pulse-Eight/USBCECAdapterMessageQueue.cpp +++ b/src/lib/adapter/Pulse-Eight/USBCECAdapterMessageQueue.cpp @@ -50,7 +50,8 @@ CCECAdapterMessageQueueEntry::CCECAdapterMessageQueueEntry(CCECAdapterMessageQue m_message(message), m_iPacketsLeft(message->IsTranmission() ? message->Size() / 4 : 1), m_bSucceeded(false), - m_bWaiting(true) {} + m_bWaiting(true), + m_queueTimeout(message->transmit_timeout) {} CCECAdapterMessageQueueEntry::~CCECAdapterMessageQueueEntry(void) { } @@ -113,21 +114,73 @@ cec_adapter_messagecode CCECAdapterMessageQueueEntry::MessageCode(void) return m_message->Message(); } -bool CCECAdapterMessageQueueEntry::IsResponse(const CCECAdapterMessage &msg) +bool CCECAdapterMessageQueueEntry::IsResponseOld(const CCECAdapterMessage &msg) { cec_adapter_messagecode msgCode = msg.Message(); + return msgCode == MessageCode() || - (m_message->IsTranmission() && msgCode == MSGCODE_TIMEOUT_ERROR) || msgCode == MSGCODE_COMMAND_ACCEPTED || msgCode == MSGCODE_COMMAND_REJECTED || - (m_message->IsTranmission() && msgCode == MSGCODE_HIGH_ERROR) || - (m_message->IsTranmission() && msgCode == MSGCODE_LOW_ERROR) || - (m_message->IsTranmission() && msgCode == MSGCODE_RECEIVE_FAILED) || - (m_message->IsTranmission() && msgCode == MSGCODE_TRANSMIT_FAILED_LINE) || - (m_message->IsTranmission() && msgCode == MSGCODE_TRANSMIT_FAILED_ACK) || - (m_message->IsTranmission() && msgCode == MSGCODE_TRANSMIT_FAILED_TIMEOUT_DATA) || - (m_message->IsTranmission() && msgCode == MSGCODE_TRANSMIT_FAILED_TIMEOUT_LINE) || - (m_message->IsTranmission() && msgCode == MSGCODE_TRANSMIT_SUCCEEDED); + (m_message->IsTranmission() && (msgCode == MSGCODE_TIMEOUT_ERROR || + msgCode == MSGCODE_HIGH_ERROR || + msgCode == MSGCODE_LOW_ERROR || + msgCode == MSGCODE_RECEIVE_FAILED || + msgCode == MSGCODE_TRANSMIT_FAILED_LINE || + msgCode == MSGCODE_TRANSMIT_FAILED_ACK || + msgCode == MSGCODE_TRANSMIT_FAILED_TIMEOUT_DATA || + msgCode == MSGCODE_TRANSMIT_FAILED_TIMEOUT_LINE || + msgCode == MSGCODE_TRANSMIT_SUCCEEDED)); +} + +bool CCECAdapterMessageQueueEntry::IsResponse(const CCECAdapterMessage &msg) +{ + if (m_message->state == ADAPTER_MESSAGE_STATE_SENT_ACKED) + return false; + + cec_adapter_messagecode thisMsgCode = m_message->Message(); + cec_adapter_messagecode msgCode = msg.Message(); + cec_adapter_messagecode msgResponse = msg.ResponseTo(); + + // msgcode matches, always a response + if (msgCode == MessageCode()) + return true; + + if (!ProvidesExtendedResponse()) + return IsResponseOld(msg); + + // response without a msgcode + if (msgResponse == MSGCODE_NOTHING) + return false; + + // commands that only repond with accepted/rejected + if (thisMsgCode == MSGCODE_PING || + thisMsgCode == MSGCODE_SET_ACK_MASK || + thisMsgCode == MSGCODE_SET_CONTROLLED || + thisMsgCode == MSGCODE_SET_AUTO_ENABLED || + thisMsgCode == MSGCODE_SET_DEFAULT_LOGICAL_ADDRESS || + thisMsgCode == MSGCODE_SET_LOGICAL_ADDRESS_MASK || + thisMsgCode == MSGCODE_SET_PHYSICAL_ADDRESS || + thisMsgCode == MSGCODE_SET_DEVICE_TYPE || + thisMsgCode == MSGCODE_SET_HDMI_VERSION || + thisMsgCode == MSGCODE_SET_OSD_NAME || + thisMsgCode == MSGCODE_WRITE_EEPROM || + thisMsgCode == MSGCODE_TRANSMIT_IDLETIME) + return thisMsgCode == msgResponse; + + if (!m_message->IsTranmission()) + { + m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_WARNING, "FIXME! not a transmission"); + return false; + } + + return ((msgCode == MSGCODE_COMMAND_ACCEPTED || msgCode == MSGCODE_COMMAND_REJECTED) && + (msgResponse == MSGCODE_TRANSMIT_ACK_POLARITY || msgResponse == MSGCODE_TRANSMIT || msgResponse == MSGCODE_TRANSMIT_EOM)) || + msgCode == MSGCODE_TIMEOUT_ERROR || + msgCode == MSGCODE_RECEIVE_FAILED || + msgCode == MSGCODE_TRANSMIT_FAILED_ACK || + msgCode == MSGCODE_TRANSMIT_FAILED_TIMEOUT_DATA || + msgCode == MSGCODE_TRANSMIT_FAILED_TIMEOUT_LINE || + msgCode == MSGCODE_TRANSMIT_SUCCEEDED; } const char *CCECAdapterMessageQueueEntry::ToString(void) const @@ -150,12 +203,14 @@ bool CCECAdapterMessageQueueEntry::MessageReceivedCommandAccepted(const CCECAdap /* decrease by 1 */ m_iPacketsLeft--; +#ifdef CEC_DEBUGGING /* log this message */ CStdString strLog; strLog.Format("%s - command accepted", ToString()); if (m_iPacketsLeft > 0) strLog.AppendFormat(" - waiting for %d more", m_iPacketsLeft); m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, strLog); +#endif /* no more packets left and not a transmission, so we're done */ if (!m_message->IsTranmission() && m_iPacketsLeft == 0) @@ -181,7 +236,9 @@ bool CCECAdapterMessageQueueEntry::MessageReceivedTransmitSucceeded(const CCECAd if (m_iPacketsLeft == 0) { /* transmission succeeded, so we're done */ - m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "%s - transmit succeeded", ToString()); +#ifdef CEC_DEBUGGING + m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "%s - transmit succeeded", m_message->ToString().c_str()); +#endif m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED; m_message->response = message.packet; } @@ -203,7 +260,9 @@ bool CCECAdapterMessageQueueEntry::MessageReceivedResponse(const CCECAdapterMess { { CLockObject lock(m_mutex); +#ifdef CEC_DEBUGGING m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "%s - received response - %s", ToString(), message.ToString().c_str()); +#endif m_message->response = message.packet; if (m_message->IsTranmission()) m_message->state = message.Message() == MSGCODE_TRANSMIT_SUCCEEDED ? ADAPTER_MESSAGE_STATE_SENT_ACKED : ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED; @@ -216,6 +275,16 @@ bool CCECAdapterMessageQueueEntry::MessageReceivedResponse(const CCECAdapterMess return true; } +bool CCECAdapterMessageQueueEntry::ProvidesExtendedResponse(void) +{ + return m_queue && m_queue->ProvidesExtendedResponse(); +} + +bool CCECAdapterMessageQueueEntry::TimedOutOrSucceeded(void) const +{ + return m_message->bFireAndForget && (m_bSucceeded || m_queueTimeout.TimeLeft() == 0); +} + CCECAdapterMessageQueue::CCECAdapterMessageQueue(CUSBCECAdapterCommunication *com) : PLATFORM::CThread(), m_com(com), @@ -227,8 +296,9 @@ CCECAdapterMessageQueue::CCECAdapterMessageQueue(CUSBCECAdapterCommunication *co CCECAdapterMessageQueue::~CCECAdapterMessageQueue(void) { + StopThread(-1); Clear(); - StopThread(0); + StopThread(); delete m_incomingAdapterMessage; } @@ -261,10 +331,35 @@ void *CCECAdapterMessageQueue::Process(void) break; } } + + CheckTimedOutMessages(); } return NULL; } +void CCECAdapterMessageQueue::CheckTimedOutMessages(void) +{ + CLockObject lock(m_mutex); + vector timedOut; + for (map::iterator it = m_messages.begin(); it != m_messages.end(); it++) + { + if (it->second->TimedOutOrSucceeded()) + { + timedOut.push_back(it->first); + if (!it->second->m_bSucceeded) + m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(it->second->m_message->Message())); + delete it->second->m_message; + delete it->second; + } + } + + for (vector::iterator it = timedOut.begin(); it != timedOut.end(); it++) + { + uint64_t iEntryId = *it; + m_messages.erase(iEntryId); + } +} + void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg) { bool bHandled(false); @@ -277,7 +372,12 @@ void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg) { /* the message wasn't handled */ bool bIsError(m_com->HandlePoll(msg)); +#ifdef CEC_DEBUGGING m_com->m_callback->GetLib()->AddLog(bIsError ? CEC_LOG_WARNING : CEC_LOG_DEBUG, msg.ToString().c_str()); +#else + if (bIsError) + m_com->m_callback->GetLib()->AddLog(CEC_LOG_WARNING, msg.ToString().c_str()); +#endif /* push this message to the current frame */ if (!bIsError && msg.PushToCecCommand(m_currentCECFrame)) @@ -327,6 +427,13 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg) } CCECAdapterMessageQueueEntry *entry = new CCECAdapterMessageQueueEntry(this, msg); + if (!entry) + { + m_com->m_callback->GetLib()->AddLog(CEC_LOG_ERROR, "couldn't create queue entry for '%s'", CCECAdapterMessage::ToString(msg->Message())); + msg->state = ADAPTER_MESSAGE_STATE_ERROR; + return false; + } + uint64_t iEntryId(0); /* add to the wait for ack queue */ if (msg->Message() != MSGCODE_START_BOOTLOADER) @@ -340,7 +447,7 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg) m_writeQueue.Push(entry); bool bReturn(true); - if (entry) + if (!msg->bFireAndForget) { if (!entry->Wait(msg->transmit_timeout <= 5 ? CEC_DEFAULT_TRANSMIT_WAIT : msg->transmit_timeout)) { @@ -354,8 +461,17 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg) CLockObject lock(m_mutex); m_messages.erase(iEntryId); } + + if (msg->ReplyIsError()) + msg->state = ADAPTER_MESSAGE_STATE_ERROR; + delete entry; } return bReturn; } + +bool CCECAdapterMessageQueue::ProvidesExtendedResponse(void) +{ + return m_com && m_com->ProvidesExtendedResponse(); +}