using namespace CEC;
using namespace PLATFORM;
+using namespace std;
CCECAdapterMessageQueueEntry::CCECAdapterMessageQueueEntry(CCECAdapterMessage *message) :
m_message(message),
bool CCECAdapterMessageQueueEntry::MessageReceived(const CCECAdapterMessage &message)
{
- bool bSendSignal(false);
bool bHandled(false);
- PLATFORM::CLockObject lock(m_mutex);
- if (!IsResponse(message))
+ if (IsResponse(message))
{
- /* we received a message from the adapter that's not a response to this command */
- if (!message.IsTranmission())
- {
- /* we received something that's not a transmission while waiting for an ack to this command, so this command failed */
-
- //TODO verify whether we're not failing too soon
- CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - %s - not a response %s - failed", __FUNCTION__, ToString(), CCECAdapterMessage::ToString(message.Message()));
- m_message->state = ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
- bSendSignal = true;
- }
- }
- else
- {
- /* we received a response, so this message is handled */
- bHandled = true;
switch (message.Message())
{
case MSGCODE_COMMAND_ACCEPTED:
- bSendSignal = MessageReceivedCommandAccepted(message);
+ bHandled = MessageReceivedCommandAccepted(message);
break;
case MSGCODE_TRANSMIT_SUCCEEDED:
- bSendSignal = MessageReceivedTransmitSucceeded(message);
+ bHandled = MessageReceivedTransmitSucceeded(message);
break;
default:
- bSendSignal = MessageReceivedResponse(message);
+ bHandled = MessageReceivedResponse(message);
break;
}
}
- /* signal the waiting thread when we're done */
- if (bSendSignal)
- {
- m_bSucceeded = true;
- m_condition.Signal();
- }
-
return bHandled;
}
+void CCECAdapterMessageQueueEntry::Signal(void)
+{
+ CLockObject lock(m_mutex);
+ m_bSucceeded = true;
+ m_condition.Signal();
+}
+
bool CCECAdapterMessageQueueEntry::Wait(uint32_t iTimeout)
{
bool bReturn(false);
bool CCECAdapterMessageQueueEntry::MessageReceivedCommandAccepted(const CCECAdapterMessage &message)
{
bool bSendSignal(false);
- if (m_iPacketsLeft == 0)
- {
- /* we received a "command accepted", but we're not waiting for one anymore */
- CLibCEC::AddLog(CEC_LOG_ERROR, "%s - received unexpected 'command accepted' message", ToString());
- m_message->state = ADAPTER_MESSAGE_STATE_ERROR;
- bSendSignal = true;
- }
- else
+ bool bHandled(false);
{
- /* decrease number of acks we're waiting on by 1 */
+ CLockObject lock(m_mutex);
if (m_iPacketsLeft > 0)
+ {
+ /* decrease by 1 */
m_iPacketsLeft--;
- /* log this message */
- CStdString strLog;
- strLog.Format("%s - command accepted", ToString());
- if (m_iPacketsLeft > 0)
- strLog.AppendFormat(" - waiting for %d more", m_iPacketsLeft);
- CLibCEC::AddLog(CEC_LOG_DEBUG, strLog);
-
- /* no more packets left and not a transmission, so we're done */
- if (!m_message->IsTranmission() && m_iPacketsLeft == 0)
- {
- m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
- m_message->response = message.packet;
- bSendSignal = true;
+ /* log this message */
+ CStdString strLog;
+ strLog.Format("%s - command accepted", ToString());
+ if (m_iPacketsLeft > 0)
+ strLog.AppendFormat(" - waiting for %d more", m_iPacketsLeft);
+ CLibCEC::AddLog(CEC_LOG_DEBUG, strLog);
+
+ /* no more packets left and not a transmission, so we're done */
+ if (!m_message->IsTranmission() && m_iPacketsLeft == 0)
+ {
+ m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
+ m_message->response = message.packet;
+ bSendSignal = true;
+ }
+ bHandled = true;
}
}
- return bSendSignal;
+
+ if (bSendSignal)
+ Signal();
+
+ return bHandled;
}
bool CCECAdapterMessageQueueEntry::MessageReceivedTransmitSucceeded(const CCECAdapterMessage &message)
{
- if (m_iPacketsLeft == 0)
- {
- /* transmission succeeded, so we're done */
- CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - transmit succeeded", ToString());
- m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
- m_message->response = message.packet;
- }
- else
{
- /* error, we expected more acks */
- CLibCEC::AddLog(CEC_LOG_WARNING, "%s - received 'transmit succeeded' but not enough 'command accepted' messages (%d left)", ToString(), m_iPacketsLeft);
- m_message->state = ADAPTER_MESSAGE_STATE_ERROR;
+ CLockObject lock(m_mutex);
+ if (m_iPacketsLeft == 0)
+ {
+ /* transmission succeeded, so we're done */
+ CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - transmit succeeded", ToString());
+ m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
+ m_message->response = message.packet;
+ }
+ else
+ {
+ /* error, we expected more acks
+ since the messages are processed in order, this should not happen, so this is an error situation */
+ CLibCEC::AddLog(CEC_LOG_WARNING, "%s - received 'transmit succeeded' but not enough 'command accepted' messages (%d left)", ToString(), m_iPacketsLeft);
+ m_message->state = ADAPTER_MESSAGE_STATE_ERROR;
+ }
}
+
+ Signal();
+
return true;
}
bool CCECAdapterMessageQueueEntry::MessageReceivedResponse(const CCECAdapterMessage &message)
{
- CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - received response", ToString());
- m_message->response = message.packet;
- if (m_message->IsTranmission())
- m_message->state = message.Message() == MSGCODE_TRANSMIT_SUCCEEDED ? ADAPTER_MESSAGE_STATE_SENT_ACKED : ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
- else
- m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
+ {
+ CLockObject lock(m_mutex);
+ CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - received response", ToString());
+ m_message->response = message.packet;
+ if (m_message->IsTranmission())
+ m_message->state = message.Message() == MSGCODE_TRANSMIT_SUCCEEDED ? ADAPTER_MESSAGE_STATE_SENT_ACKED : ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
+ else
+ m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
+ }
+
+ Signal();
+
return true;
}
void CCECAdapterMessageQueue::Clear(void)
{
CLockObject lock(m_mutex);
- CCECAdapterMessageQueueEntry *message(NULL);
- while (m_messages.Pop(message))
- message->Broadcast();
+ m_messages.clear();
}
void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg)
{
+ bool bHandled(false);
CLockObject lock(m_mutex);
- CCECAdapterMessageQueueEntry *message = GetNextQueuedEntry();
+ /* send the received message to each entry in the queue until it is handled */
+ for (map<uint64_t, CCECAdapterMessageQueueEntry *>::iterator it = m_messages.begin(); !bHandled && it != m_messages.end(); it++)
+ bHandled = it->second->MessageReceived(msg);
- /* send the received message to the first entry in the queue */
- bool bHandled = message ? message->MessageReceived(msg) : false;
-
- if (!message || !bHandled)
+ if (!bHandled)
{
/* the message wasn't handled */
bool bIsError(m_com->HandlePoll(msg));
}
CCECAdapterMessageQueueEntry *entry(NULL);
+ uint64_t iEntryId(0);
/* add to the wait for ack queue */
if (msg->Message() != MSGCODE_START_BOOTLOADER)
{
+ CLockObject lock(m_mutex);
entry = new CCECAdapterMessageQueueEntry(msg);
- PLATFORM::CLockObject lock(m_mutex);
- m_messages.Push(entry);
+ iEntryId = m_iNextMessage++;
+ m_messages.insert(make_pair(iEntryId, entry));
}
/* TODO write the message async */
return false;
}
- if (entry && !entry->Wait(msg->transmit_timeout <= 5 ? CEC_DEFAULT_TRANSMIT_WAIT : msg->transmit_timeout))
+ bool bReturn(true);
+ if (entry)
{
- CLibCEC::AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(msg->Message()));
- msg->state = ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
- return false;
- }
-
- return true;
-}
-
-CCECAdapterMessageQueueEntry *CCECAdapterMessageQueue::GetNextQueuedEntry(void)
-{
- CCECAdapterMessageQueueEntry *message(NULL);
- while (message == NULL && m_messages.Peek(message))
- {
- if (!message->IsWaiting())
+ if (!entry->Wait(msg->transmit_timeout <= 5 ? CEC_DEFAULT_TRANSMIT_WAIT : msg->transmit_timeout))
{
- /* delete old messages */
- m_messages.Pop(message);
- delete message;
- message = NULL;
+ CLibCEC::AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(msg->Message()));
+ msg->state = ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
+ bReturn = false;
}
+
+ CLockObject lock(m_mutex);
+ m_messages.erase(iEntryId);
}
- return message;
+
+ return bReturn;
}
*/
#include "USBCECAdapterMessage.h"
+#include <map>
namespace CEC
{
/*!
* @brief Called when a 'command accepted' message was received.
* @param message The message that was received.
- * @return True when the waiting thread need to be signaled, false otherwise.
+ * @return True when the message was handled, false otherwise.
*/
bool MessageReceivedCommandAccepted(const CCECAdapterMessage &message);
/*!
* @brief Called when a 'transmit succeeded' message was received.
* @param message The message that was received.
- * @return True when the waiting thread need to be signaled, false otherwise.
+ * @return True when the message was handled, false otherwise.
*/
bool MessageReceivedTransmitSucceeded(const CCECAdapterMessage &message);
/*!
* @brief Called when a message that's not a 'command accepted' or 'transmit succeeded' message was received.
* @param message The message that was received.
- * @return True when the waiting thread need to be signaled, false otherwise.
+ * @return True when the message was handled, false otherwise.
*/
bool MessageReceivedResponse(const CCECAdapterMessage &message);
+ /*!
+ * @brief Signals the waiting thread.
+ */
+ void Signal(void);
+
CCECAdapterMessage * m_message; /**< the message that was sent */
uint8_t m_iPacketsLeft; /**< the amount of acks that we're waiting on */
bool m_bSucceeded; /**< true when the command received a response, false otherwise */
* @param com The communication handler callback to use.
* @param iQueueSize The outgoing message queue size.
*/
- CCECAdapterMessageQueue(CUSBCECAdapterCommunication *com, size_t iQueueSize = 64) :
+ CCECAdapterMessageQueue(CUSBCECAdapterCommunication *com) :
m_com(com),
- m_messages(iQueueSize) {}
+ m_iNextMessage(0) {}
virtual ~CCECAdapterMessageQueue(void);
/*!
bool Write(CCECAdapterMessage *msg);
private:
- /*!
- * @return The next message in the queue, or NULL if there is none.
- */
- CCECAdapterMessageQueueEntry *GetNextQueuedEntry(void);
-
CUSBCECAdapterCommunication * m_com; /**< the communication handler */
PLATFORM::CMutex m_mutex; /**< mutex for changes to this class */
- PLATFORM::SyncedBuffer<CCECAdapterMessageQueueEntry *> m_messages; /**< the outgoing message queue */
+ std::map<uint64_t, CCECAdapterMessageQueueEntry *> m_messages; /**< the outgoing message queue */
+ uint64_t m_iNextMessage; /**< the index of the next message */
CCECAdapterMessage m_incomingAdapterMessage; /**< the current incoming message that's being assembled */
cec_command m_currentCECFrame; /**< the current incoming CEC command that's being assembled */
};