using namespace PLATFORM;
using namespace std;
-CCECAdapterMessageQueueEntry::CCECAdapterMessageQueueEntry(CCECAdapterMessage *message) :
+#define MESSAGE_QUEUE_SIGNAL_WAIT_TIME 1000
+
+CCECAdapterMessageQueueEntry::CCECAdapterMessageQueueEntry(CCECAdapterMessageQueue *queue, CCECAdapterMessage *message) :
+ m_queue(queue),
m_message(message),
m_iPacketsLeft(message->IsTranmission() ? message->Size() / 4 : 1),
m_bSucceeded(false),
{
cec_adapter_messagecode msgCode = msg.Message();
return msgCode == MessageCode() ||
- msgCode == MSGCODE_TIMEOUT_ERROR ||
+ (m_message->IsTranmission() && msgCode == MSGCODE_TIMEOUT_ERROR) ||
msgCode == MSGCODE_COMMAND_ACCEPTED ||
msgCode == MSGCODE_COMMAND_REJECTED ||
(m_message->IsTranmission() && msgCode == MSGCODE_HIGH_ERROR) ||
strLog.Format("%s - command accepted", ToString());
if (m_iPacketsLeft > 0)
strLog.AppendFormat(" - waiting for %d more", m_iPacketsLeft);
- CLibCEC::AddLog(CEC_LOG_DEBUG, strLog);
+ m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, strLog);
/* no more packets left and not a transmission, so we're done */
if (!m_message->IsTranmission() && m_iPacketsLeft == 0)
if (m_iPacketsLeft == 0)
{
/* transmission succeeded, so we're done */
- CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - transmit succeeded", ToString());
+ m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "%s - transmit succeeded", ToString());
m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
m_message->response = message.packet;
}
{
/* error, we expected more acks
since the messages are processed in order, this should not happen, so this is an error situation */
- CLibCEC::AddLog(CEC_LOG_WARNING, "%s - received 'transmit succeeded' but not enough 'command accepted' messages (%d left)", ToString(), m_iPacketsLeft);
+ m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_WARNING, "%s - received 'transmit succeeded' but not enough 'command accepted' messages (%d left)", ToString(), m_iPacketsLeft);
m_message->state = ADAPTER_MESSAGE_STATE_ERROR;
}
}
{
{
CLockObject lock(m_mutex);
- CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - received response", ToString());
+ m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "%s - received response - %s", ToString(), message.ToString().c_str());
m_message->response = message.packet;
if (m_message->IsTranmission())
m_message->state = message.Message() == MSGCODE_TRANSMIT_SUCCEEDED ? ADAPTER_MESSAGE_STATE_SENT_ACKED : ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
CCECAdapterMessageQueue::~CCECAdapterMessageQueue(void)
{
Clear();
+ StopThread(0);
}
void CCECAdapterMessageQueue::Clear(void)
{
+ StopThread(5);
CLockObject lock(m_mutex);
+ m_writeQueue.Clear();
m_messages.clear();
}
+void *CCECAdapterMessageQueue::Process(void)
+{
+ CCECAdapterMessageQueueEntry *message(NULL);
+ while (!IsStopped())
+ {
+ /* wait for a new message */
+ if (m_writeQueue.Pop(message, MESSAGE_QUEUE_SIGNAL_WAIT_TIME) && message)
+ {
+ /* write this message */
+ {
+ CLockObject lock(m_mutex);
+ m_com->WriteToDevice(message->m_message);
+ }
+ if (message->m_message->state == ADAPTER_MESSAGE_STATE_ERROR ||
+ message->m_message->Message() == MSGCODE_START_BOOTLOADER)
+ {
+ message->Signal();
+ Clear();
+ break;
+ }
+ }
+ }
+ return NULL;
+}
+
void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg)
{
bool bHandled(false);
{
/* the message wasn't handled */
bool bIsError(m_com->HandlePoll(msg));
- CLibCEC::AddLog(bIsError ? CEC_LOG_WARNING : CEC_LOG_DEBUG, msg.ToString());
+ m_com->m_callback->GetLib()->AddLog(bIsError ? CEC_LOG_WARNING : CEC_LOG_DEBUG, msg.ToString());
/* push this message to the current frame */
if (!bIsError && msg.PushToCecCommand(m_currentCECFrame))
m_com->SetLineTimeout(msg->lineTimeout);
}
- CCECAdapterMessageQueueEntry *entry(NULL);
+ CCECAdapterMessageQueueEntry *entry = new CCECAdapterMessageQueueEntry(this, msg);
uint64_t iEntryId(0);
/* add to the wait for ack queue */
if (msg->Message() != MSGCODE_START_BOOTLOADER)
{
CLockObject lock(m_mutex);
- entry = new CCECAdapterMessageQueueEntry(msg);
iEntryId = m_iNextMessage++;
m_messages.insert(make_pair(iEntryId, entry));
}
- /* TODO write the message async */
- if (!m_com->WriteToDevice(msg))
- {
- /* error! */
- Clear();
- return false;
- }
+ /* add the message to the write queue */
+ m_writeQueue.Push(entry);
bool bReturn(true);
if (entry)
{
if (!entry->Wait(msg->transmit_timeout <= 5 ? CEC_DEFAULT_TRANSMIT_WAIT : msg->transmit_timeout))
{
- CLibCEC::AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(msg->Message()));
+ m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(msg->Message()));
msg->state = ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
bReturn = false;
}
- CLockObject lock(m_mutex);
- m_messages.erase(iEntryId);
+ if (msg->Message() != MSGCODE_START_BOOTLOADER)
+ {
+ CLockObject lock(m_mutex);
+ m_messages.erase(iEntryId);
+ }
delete entry;
}