iLineTimeout = m_iRetryLineTimeout;
}
- return adapterState == ADAPTER_MESSAGE_STATE_SENT_ACKED;
+ return bIsReply ?
+ adapterState == ADAPTER_MESSAGE_STATE_SENT_ACKED || adapterState == ADAPTER_MESSAGE_STATE_SENT || adapterState == ADAPTER_MESSAGE_STATE_WAITING_TO_BE_SENT :
+ adapterState == ADAPTER_MESSAGE_STATE_SENT_ACKED;
}
void CCECProcessor::TransmitAbort(cec_logical_address source, cec_logical_address destination, cec_opcode opcode, cec_abort_reason reason /* = CEC_ABORT_REASON_UNRECOGNIZED_OPCODE */)
m_persistedConfiguration.iFirmwareVersion = (response[0] << 8 | response[1]);
else
{
- LIB_CEC->AddLog(CEC_LOG_WARNING, "the adapter did not respond with a correct firmware version (try %d)", iFwVersionTry);
+ LIB_CEC->AddLog(CEC_LOG_WARNING, "the adapter did not respond with a correct firmware version (try %d, size = %d)", iFwVersionTry, response.size);
CEvent::Sleep(500);
}
}
m_port->Close();
}
-cec_adapter_message_state CUSBCECAdapterCommunication::Write(const cec_command &data, bool &bRetry, uint8_t iLineTimeout, bool UNUSED(bIsReply))
+cec_adapter_message_state CUSBCECAdapterCommunication::Write(const cec_command &data, bool &bRetry, uint8_t iLineTimeout, bool bIsReply)
{
cec_adapter_message_state retVal(ADAPTER_MESSAGE_STATE_UNKNOWN);
if (!IsRunning())
return retVal;
CCECAdapterMessage *output = new CCECAdapterMessage(data, iLineTimeout);
+ output->bFireAndForget = bIsReply;
/* mark as waiting for an ack from the destination */
MarkAsWaiting(data.destination);
/* send the message */
- bRetry = (!m_adapterMessageQueue->Write(output) || output->NeedsRetry()) && output->transmit_timeout > 0;
- if (bRetry)
- Sleep(CEC_DEFAULT_TRANSMIT_RETRY_WAIT);
- retVal = output->state;
+ if (bIsReply)
+ {
+ retVal = m_adapterMessageQueue->Write(output) ?
+ ADAPTER_MESSAGE_STATE_WAITING_TO_BE_SENT : ADAPTER_MESSAGE_STATE_ERROR;
+ }
+ else
+ {
+ bRetry = (!m_adapterMessageQueue->Write(output) || output->NeedsRetry()) && output->transmit_timeout > 0;
+ if (bRetry)
+ Sleep(CEC_DEFAULT_TRANSMIT_RETRY_WAIT);
+ retVal = output->state;
- delete output;
+ delete output;
+ }
return retVal;
}
packet.Clear();
lineTimeout = 3;
bNextByteIsEscaped = false;
+ bFireAndForget = false;
}
void CCECAdapterMessage::Shift(uint8_t iShiftBy)
cec_adapter_message_state state; /**< the current state of this message */
int32_t transmit_timeout; /**< the timeout to use when sending this message */
uint8_t lineTimeout; /**< the default CEC line timeout to use when sending this message */
+ bool bFireAndForget; /**< true to auto delete, don't wait for a response */
private:
bool bNextByteIsEscaped; /**< true when the next byte that is added will be escaped, false otherwise */
m_message(message),
m_iPacketsLeft(message->IsTranmission() ? message->Size() / 4 : 1),
m_bSucceeded(false),
- m_bWaiting(true) {}
+ m_bWaiting(true),
+ m_queueTimeout(message->transmit_timeout) {}
CCECAdapterMessageQueueEntry::~CCECAdapterMessageQueueEntry(void) { }
return m_queue && m_queue->ProvidesExtendedResponse();
}
+bool CCECAdapterMessageQueueEntry::TimedOutOrSucceeded(void) const
+{
+ return m_message->bFireAndForget && (m_bSucceeded || m_queueTimeout.TimeLeft() == 0);
+}
+
CCECAdapterMessageQueue::CCECAdapterMessageQueue(CUSBCECAdapterCommunication *com) :
PLATFORM::CThread(),
m_com(com),
break;
}
}
+
+ CheckTimedOutMessages();
}
return NULL;
}
+void CCECAdapterMessageQueue::CheckTimedOutMessages(void)
+{
+ CLockObject lock(m_mutex);
+ vector<uint64_t> timedOut;
+ for (map<uint64_t, CCECAdapterMessageQueueEntry *>::iterator it = m_messages.begin(); it != m_messages.end(); it++)
+ {
+ if (it->second->TimedOutOrSucceeded())
+ {
+ timedOut.push_back(it->first);
+ if (!it->second->m_bSucceeded)
+ m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(it->second->m_message->Message()));
+ delete it->second->m_message;
+ delete it->second;
+ }
+ }
+
+ for (vector<uint64_t>::iterator it = timedOut.begin(); it != timedOut.end(); it++)
+ {
+ uint64_t iEntryId = *it;
+ m_messages.erase(iEntryId);
+ }
+}
+
void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg)
{
bool bHandled(false);
}
CCECAdapterMessageQueueEntry *entry = new CCECAdapterMessageQueueEntry(this, msg);
+ if (!entry)
+ {
+ m_com->m_callback->GetLib()->AddLog(CEC_LOG_ERROR, "couldn't create queue entry for '%s'", CCECAdapterMessage::ToString(msg->Message()));
+ msg->state = ADAPTER_MESSAGE_STATE_ERROR;
+ return false;
+ }
+
uint64_t iEntryId(0);
/* add to the wait for ack queue */
if (msg->Message() != MSGCODE_START_BOOTLOADER)
m_writeQueue.Push(entry);
bool bReturn(true);
- if (entry)
+ if (!msg->bFireAndForget)
{
if (!entry->Wait(msg->transmit_timeout <= 5 ? CEC_DEFAULT_TRANSMIT_WAIT : msg->transmit_timeout))
{
#include "lib/platform/threads/threads.h"
#include "lib/platform/util/buffer.h"
+#include "lib/platform/util/timeutils.h"
#include <map>
#include "USBCECAdapterMessage.h"
bool ProvidesExtendedResponse(void);
+ /*!
+ * @return True when a fire and forget packet timed out or succeeded, false otherwise
+ */
+ bool TimedOutOrSucceeded(void) const;
+
CCECAdapterMessageQueue * m_queue;
CCECAdapterMessage * m_message; /**< the message that was sent */
uint8_t m_iPacketsLeft; /**< the amount of acks that we're waiting on */
bool m_bWaiting; /**< true while a thread is waiting or when it hasn't started waiting yet */
PLATFORM::CCondition<bool> m_condition; /**< the condition to wait on */
PLATFORM::CMutex m_mutex; /**< mutex for changes to this class */
+ PLATFORM::CTimeout m_queueTimeout; /**< ack timeout for fire and forget commands */
};
class CCECAdapterMessageQueue : public PLATFORM::CThread
virtual void *Process(void);
+ void CheckTimedOutMessages(void);
+
private:
CUSBCECAdapterCommunication * m_com; /**< the communication handler */
PLATFORM::CMutex m_mutex; /**< mutex for changes to this class */