X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Flib%2Fadapter%2FUSBCECAdapterMessageQueue.cpp;h=45b348c8752af10aed323f993fc118fd918a6c45;hb=99aeafb929fa132a096c236c4ae1eb78c2a595ec;hp=7f988949e0d93a5b9f10a60c3832a0d64e4ec010;hpb=4c2e665c0dcf87d0b45c07eb592a1aebf4ccf1f7;p=deb_libcec.git diff --git a/src/lib/adapter/USBCECAdapterMessageQueue.cpp b/src/lib/adapter/USBCECAdapterMessageQueue.cpp index 7f98894..45b348c 100644 --- a/src/lib/adapter/USBCECAdapterMessageQueue.cpp +++ b/src/lib/adapter/USBCECAdapterMessageQueue.cpp @@ -39,7 +39,10 @@ using namespace CEC; using namespace PLATFORM; using namespace std; -CCECAdapterMessageQueueEntry::CCECAdapterMessageQueueEntry(CCECAdapterMessage *message) : +#define MESSAGE_QUEUE_SIGNAL_WAIT_TIME 1000 + +CCECAdapterMessageQueueEntry::CCECAdapterMessageQueueEntry(CCECAdapterMessageQueue *queue, CCECAdapterMessage *message) : + m_queue(queue), m_message(message), m_iPacketsLeft(message->IsTranmission() ? message->Size() / 4 : 1), m_bSucceeded(false), @@ -148,7 +151,7 @@ bool CCECAdapterMessageQueueEntry::MessageReceivedCommandAccepted(const CCECAdap strLog.Format("%s - command accepted", ToString()); if (m_iPacketsLeft > 0) strLog.AppendFormat(" - waiting for %d more", m_iPacketsLeft); - CLibCEC::AddLog(CEC_LOG_DEBUG, strLog); + m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, strLog); /* no more packets left and not a transmission, so we're done */ if (!m_message->IsTranmission() && m_iPacketsLeft == 0) @@ -174,7 +177,7 @@ bool CCECAdapterMessageQueueEntry::MessageReceivedTransmitSucceeded(const CCECAd if (m_iPacketsLeft == 0) { /* transmission succeeded, so we're done */ - CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - transmit succeeded", ToString()); + m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "%s - transmit succeeded", ToString()); m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED; m_message->response = message.packet; } @@ -182,7 +185,7 @@ bool CCECAdapterMessageQueueEntry::MessageReceivedTransmitSucceeded(const CCECAd { /* error, we expected more acks since the messages are processed in order, this should not happen, so this is an error situation */ - CLibCEC::AddLog(CEC_LOG_WARNING, "%s - received 'transmit succeeded' but not enough 'command accepted' messages (%d left)", ToString(), m_iPacketsLeft); + m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_WARNING, "%s - received 'transmit succeeded' but not enough 'command accepted' messages (%d left)", ToString(), m_iPacketsLeft); m_message->state = ADAPTER_MESSAGE_STATE_ERROR; } } @@ -196,7 +199,7 @@ bool CCECAdapterMessageQueueEntry::MessageReceivedResponse(const CCECAdapterMess { { CLockObject lock(m_mutex); - CLibCEC::AddLog(CEC_LOG_DEBUG, "%s - received response", ToString()); + m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "%s - received response - %s", ToString(), message.ToString().c_str()); m_message->response = message.packet; if (m_message->IsTranmission()) m_message->state = message.Message() == MSGCODE_TRANSMIT_SUCCEEDED ? ADAPTER_MESSAGE_STATE_SENT_ACKED : ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED; @@ -213,14 +216,42 @@ bool CCECAdapterMessageQueueEntry::MessageReceivedResponse(const CCECAdapterMess CCECAdapterMessageQueue::~CCECAdapterMessageQueue(void) { Clear(); + StopThread(0); } void CCECAdapterMessageQueue::Clear(void) { + StopThread(5); CLockObject lock(m_mutex); + m_writeQueue.Clear(); m_messages.clear(); } +void *CCECAdapterMessageQueue::Process(void) +{ + CCECAdapterMessageQueueEntry *message(NULL); + while (!IsStopped()) + { + /* wait for a new message */ + if (m_writeQueue.Pop(message, MESSAGE_QUEUE_SIGNAL_WAIT_TIME) && message) + { + /* write this message */ + { + CLockObject lock(m_mutex); + m_com->WriteToDevice(message->m_message); + } + if (message->m_message->state == ADAPTER_MESSAGE_STATE_ERROR || + message->m_message->Message() == MSGCODE_START_BOOTLOADER) + { + message->Signal(); + Clear(); + break; + } + } + } + return NULL; +} + void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg) { bool bHandled(false); @@ -233,7 +264,7 @@ void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg) { /* the message wasn't handled */ bool bIsError(m_com->HandlePoll(msg)); - CLibCEC::AddLog(bIsError ? CEC_LOG_WARNING : CEC_LOG_DEBUG, msg.ToString()); + m_com->m_callback->GetLib()->AddLog(bIsError ? CEC_LOG_WARNING : CEC_LOG_DEBUG, msg.ToString()); /* push this message to the current frame */ if (!bIsError && msg.PushToCecCommand(m_currentCECFrame)) @@ -282,37 +313,34 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg) m_com->SetLineTimeout(msg->lineTimeout); } - CCECAdapterMessageQueueEntry *entry(NULL); + CCECAdapterMessageQueueEntry *entry = new CCECAdapterMessageQueueEntry(this, msg); uint64_t iEntryId(0); /* add to the wait for ack queue */ if (msg->Message() != MSGCODE_START_BOOTLOADER) { CLockObject lock(m_mutex); - entry = new CCECAdapterMessageQueueEntry(msg); iEntryId = m_iNextMessage++; m_messages.insert(make_pair(iEntryId, entry)); } - /* TODO write the message async */ - if (!m_com->WriteToDevice(msg)) - { - /* error! */ - Clear(); - return false; - } + /* add the message to the write queue */ + m_writeQueue.Push(entry); bool bReturn(true); if (entry) { if (!entry->Wait(msg->transmit_timeout <= 5 ? CEC_DEFAULT_TRANSMIT_WAIT : msg->transmit_timeout)) { - CLibCEC::AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(msg->Message())); + m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(msg->Message())); msg->state = ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED; bReturn = false; } - CLockObject lock(m_mutex); - m_messages.erase(iEntryId); + if (msg->Message() != MSGCODE_START_BOOTLOADER) + { + CLockObject lock(m_mutex); + m_messages.erase(iEntryId); + } delete entry; }