using namespace CEC;
using namespace PLATFORM;
+using namespace std;
-CCECAdapterMessageQueueEntry::CCECAdapterMessageQueueEntry(CCECAdapterMessage *message) :
+#define MESSAGE_QUEUE_SIGNAL_WAIT_TIME 1000
+
+CCECAdapterMessageQueueEntry::CCECAdapterMessageQueueEntry(CCECAdapterMessageQueue *queue, CCECAdapterMessage *message) :
+ m_queue(queue),
m_message(message),
m_iPacketsLeft(message->IsTranmission() ? message->Size() / 4 : 1),
m_bSucceeded(false),
bool CCECAdapterMessageQueueEntry::MessageReceived(const CCECAdapterMessage &message)
{
- bool bSendSignal(false);
bool bHandled(false);
- PLATFORM::CLockObject lock(m_mutex);
- if (!IsResponse(message))
- {
- /* we received a message from the adapter that's not a response to this command */
- if (!message.IsTranmission())
- {
- /* we received something that's not a transmission while waiting for an ack to this command, so this command failed */
-
- //TODO verify whether we're not failing too soon
- CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - %s - not a response %s - failed", __FUNCTION__, ToString(), CCECAdapterMessage::ToString(message.Message()));
- m_message->state = ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
- bSendSignal = true;
- }
- }
- else
+ if (IsResponse(message))
{
- /* we received a response, so this message is handled */
- bHandled = true;
switch (message.Message())
{
case MSGCODE_COMMAND_ACCEPTED:
- bSendSignal = MessageReceivedCommandAccepted(message);
+ bHandled = MessageReceivedCommandAccepted(message);
break;
case MSGCODE_TRANSMIT_SUCCEEDED:
- bSendSignal = MessageReceivedTransmitSucceeded(message);
+ bHandled = MessageReceivedTransmitSucceeded(message);
break;
default:
- bSendSignal = MessageReceivedResponse(message);
+ bHandled = MessageReceivedResponse(message);
break;
}
}
- /* signal the waiting thread when we're done */
- if (bSendSignal)
- {
- m_bSucceeded = true;
- m_condition.Signal();
- }
-
return bHandled;
}
+void CCECAdapterMessageQueueEntry::Signal(void)
+{
+ CLockObject lock(m_mutex);
+ m_bSucceeded = true;
+ m_condition.Signal();
+}
+
bool CCECAdapterMessageQueueEntry::Wait(uint32_t iTimeout)
{
bool bReturn(false);
{
cec_adapter_messagecode msgCode = msg.Message();
return msgCode == MessageCode() ||
- msgCode == MSGCODE_TIMEOUT_ERROR ||
+ (m_message->IsTranmission() && msgCode == MSGCODE_TIMEOUT_ERROR) ||
msgCode == MSGCODE_COMMAND_ACCEPTED ||
msgCode == MSGCODE_COMMAND_REJECTED ||
(m_message->IsTranmission() && msgCode == MSGCODE_HIGH_ERROR) ||
bool CCECAdapterMessageQueueEntry::MessageReceivedCommandAccepted(const CCECAdapterMessage &message)
{
bool bSendSignal(false);
- if (m_iPacketsLeft == 0)
- {
- /* we received a "command accepted", but we're not waiting for one anymore */
- CLibCEC::AddLog(CEC_LOG_ERROR, "%s - received unexpected 'command accepted' message", ToString());
- m_message->state = ADAPTER_MESSAGE_STATE_ERROR;
- bSendSignal = true;
- }
- else
+ bool bHandled(false);
{
- /* decrease number of acks we're waiting on by 1 */
+ CLockObject lock(m_mutex);
if (m_iPacketsLeft > 0)
+ {
+ /* decrease by 1 */
m_iPacketsLeft--;
- /* log this message */
- CStdString strLog;
- strLog.Format("%s - command accepted", ToString());
- if (m_iPacketsLeft > 0)
- strLog.AppendFormat(" - waiting for %d more", m_iPacketsLeft);
- CLibCEC::AddLog(CEC_LOG_DEBUG, strLog);
-
- /* no more packets left and not a transmission, so we're done */
- if (!m_message->IsTranmission() && m_iPacketsLeft == 0)
- {
- m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
- m_message->response = message.packet;
- bSendSignal = true;
+ /* log this message */
+ CStdString strLog;
+ strLog.Format("%s - command accepted", ToString());
+ if (m_iPacketsLeft > 0)
+ strLog.AppendFormat(" - waiting for %d more", m_iPacketsLeft);
+ m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, strLog);
+
+ /* no more packets left and not a transmission, so we're done */
+ if (!m_message->IsTranmission() && m_iPacketsLeft == 0)
+ {
+ m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
+ m_message->response = message.packet;
+ bSendSignal = true;
+ }
+ bHandled = true;
}
}
- return bSendSignal;
+
+ if (bSendSignal)
+ Signal();
+
+ return bHandled;
}
bool CCECAdapterMessageQueueEntry::MessageReceivedTransmitSucceeded(const CCECAdapterMessage &message)
{
- if (m_iPacketsLeft == 0)
- {
- /* transmission succeeded, so we're done */
- CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - transmit succeeded", ToString());
- m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
- m_message->response = message.packet;
- }
- else
{
- /* error, we expected more acks */
- CLibCEC::AddLog(CEC_LOG_WARNING, "%s - received 'transmit succeeded' but not enough 'command accepted' messages (%d left)", ToString(), m_iPacketsLeft);
- m_message->state = ADAPTER_MESSAGE_STATE_ERROR;
+ CLockObject lock(m_mutex);
+ if (m_iPacketsLeft == 0)
+ {
+ /* transmission succeeded, so we're done */
+ m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "%s - transmit succeeded", ToString());
+ m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
+ m_message->response = message.packet;
+ }
+ else
+ {
+ /* error, we expected more acks
+ since the messages are processed in order, this should not happen, so this is an error situation */
+ m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_WARNING, "%s - received 'transmit succeeded' but not enough 'command accepted' messages (%d left)", ToString(), m_iPacketsLeft);
+ m_message->state = ADAPTER_MESSAGE_STATE_ERROR;
+ }
}
+
+ Signal();
+
return true;
}
bool CCECAdapterMessageQueueEntry::MessageReceivedResponse(const CCECAdapterMessage &message)
{
- CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - received response", ToString());
- m_message->response = message.packet;
- if (m_message->IsTranmission())
- m_message->state = message.Message() == MSGCODE_TRANSMIT_SUCCEEDED ? ADAPTER_MESSAGE_STATE_SENT_ACKED : ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
- else
- m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
+ {
+ CLockObject lock(m_mutex);
+ m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "%s - received response - %s", ToString(), message.ToString().c_str());
+ m_message->response = message.packet;
+ if (m_message->IsTranmission())
+ m_message->state = message.Message() == MSGCODE_TRANSMIT_SUCCEEDED ? ADAPTER_MESSAGE_STATE_SENT_ACKED : ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
+ else
+ m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
+ }
+
+ Signal();
+
return true;
}
CCECAdapterMessageQueue::~CCECAdapterMessageQueue(void)
{
Clear();
+ StopThread(0);
}
void CCECAdapterMessageQueue::Clear(void)
{
+ StopThread(5);
CLockObject lock(m_mutex);
+ m_writeQueue.Clear();
+ m_messages.clear();
+}
+
+void *CCECAdapterMessageQueue::Process(void)
+{
CCECAdapterMessageQueueEntry *message(NULL);
- while (m_messages.Pop(message))
- message->Broadcast();
+ while (!IsStopped())
+ {
+ /* wait for a new message */
+ if (m_writeQueue.Pop(message, MESSAGE_QUEUE_SIGNAL_WAIT_TIME) && message)
+ {
+ /* write this message */
+ {
+ CLockObject lock(m_mutex);
+ m_com->WriteToDevice(message->m_message);
+ }
+ if (message->m_message->state == ADAPTER_MESSAGE_STATE_ERROR ||
+ message->m_message->Message() == MSGCODE_START_BOOTLOADER)
+ {
+ message->Signal();
+ Clear();
+ break;
+ }
+ }
+ }
+ return NULL;
}
void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg)
{
+ bool bHandled(false);
CLockObject lock(m_mutex);
- CCECAdapterMessageQueueEntry *message = GetNextQueuedEntry();
+ /* send the received message to each entry in the queue until it is handled */
+ for (map<uint64_t, CCECAdapterMessageQueueEntry *>::iterator it = m_messages.begin(); !bHandled && it != m_messages.end(); it++)
+ bHandled = it->second->MessageReceived(msg);
- /* send the received message to the first entry in the queue */
- bool bHandled = message ? message->MessageReceived(msg) : false;
-
- if (!message || !bHandled)
+ if (!bHandled)
{
/* the message wasn't handled */
bool bIsError(m_com->HandlePoll(msg));
- CLibCEC::AddLog(bIsError ? CEC_LOG_WARNING : CEC_LOG_DEBUG, msg.ToString());
+ m_com->m_callback->GetLib()->AddLog(bIsError ? CEC_LOG_WARNING : CEC_LOG_DEBUG, msg.ToString());
/* push this message to the current frame */
if (!bIsError && msg.PushToCecCommand(m_currentCECFrame))
/* set the correct line timeout */
if (msg->IsTranmission())
{
- if (msg->tries == 1)
- m_com->SetLineTimeout(msg->lineTimeout);
- else
- m_com->SetLineTimeout(msg->retryTimeout);
+ m_com->SetLineTimeout(msg->lineTimeout);
}
- CCECAdapterMessageQueueEntry *entry(NULL);
+ CCECAdapterMessageQueueEntry *entry = new CCECAdapterMessageQueueEntry(this, msg);
+ uint64_t iEntryId(0);
/* add to the wait for ack queue */
if (msg->Message() != MSGCODE_START_BOOTLOADER)
{
- entry = new CCECAdapterMessageQueueEntry(msg);
- PLATFORM::CLockObject lock(m_mutex);
- m_messages.Push(entry);
+ CLockObject lock(m_mutex);
+ iEntryId = m_iNextMessage++;
+ m_messages.insert(make_pair(iEntryId, entry));
}
- /* TODO write the message async */
- if (!m_com->WriteToDevice(msg))
- {
- /* error! */
- Clear();
- return false;
- }
+ /* add the message to the write queue */
+ m_writeQueue.Push(entry);
- if (entry && !entry->Wait(msg->transmit_timeout <= 5 ? CEC_DEFAULT_TRANSMIT_WAIT : msg->transmit_timeout))
+ bool bReturn(true);
+ if (entry)
{
- CLibCEC::AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(msg->Message()));
- msg->state = ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
- return false;
- }
-
- return true;
-}
+ if (!entry->Wait(msg->transmit_timeout <= 5 ? CEC_DEFAULT_TRANSMIT_WAIT : msg->transmit_timeout))
+ {
+ m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(msg->Message()));
+ msg->state = ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
+ bReturn = false;
+ }
-CCECAdapterMessageQueueEntry *CCECAdapterMessageQueue::GetNextQueuedEntry(void)
-{
- CCECAdapterMessageQueueEntry *message(NULL);
- while (message == NULL && m_messages.Peek(message))
- {
- if (!message->IsWaiting())
+ if (msg->Message() != MSGCODE_START_BOOTLOADER)
{
- /* delete old messages */
- m_messages.Pop(message);
- delete message;
- message = NULL;
+ CLockObject lock(m_mutex);
+ m_messages.erase(iEntryId);
}
+ delete entry;
}
- return message;
+
+ return bReturn;
}