X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Flib%2Fadapter%2FUSBCECAdapterMessageQueue.cpp;h=a270a78235792a39a81a6fd746088c7155cea175;hb=466925f5c43536e5fd96632615810da783b78096;hp=56c81a6e4709438eef082d9aaf066a8380382856;hpb=95acc41b63e3a1f0af9295cbe5adc1210feefb79;p=deb_libcec.git diff --git a/src/lib/adapter/USBCECAdapterMessageQueue.cpp b/src/lib/adapter/USBCECAdapterMessageQueue.cpp index 56c81a6..a270a78 100644 --- a/src/lib/adapter/USBCECAdapterMessageQueue.cpp +++ b/src/lib/adapter/USBCECAdapterMessageQueue.cpp @@ -39,6 +39,8 @@ using namespace CEC; using namespace PLATFORM; using namespace std; +#define MESSAGE_QUEUE_SIGNAL_WAIT_TIME 1000 + CCECAdapterMessageQueueEntry::CCECAdapterMessageQueueEntry(CCECAdapterMessage *message) : m_message(message), m_iPacketsLeft(message->IsTranmission() ? message->Size() / 4 : 1), @@ -213,14 +215,42 @@ bool CCECAdapterMessageQueueEntry::MessageReceivedResponse(const CCECAdapterMess CCECAdapterMessageQueue::~CCECAdapterMessageQueue(void) { Clear(); + StopThread(0); } void CCECAdapterMessageQueue::Clear(void) { + StopThread(5); CLockObject lock(m_mutex); + m_writeQueue.Clear(); m_messages.clear(); } +void *CCECAdapterMessageQueue::Process(void) +{ + CCECAdapterMessageQueueEntry *message(NULL); + while (!IsStopped()) + { + /* wait for a new message */ + if (m_writeQueue.Pop(message, MESSAGE_QUEUE_SIGNAL_WAIT_TIME) && message) + { + /* write this message */ + { + CLockObject lock(m_mutex); + m_com->WriteToDevice(message->m_message); + } + if (message->m_message->state == ADAPTER_MESSAGE_STATE_ERROR || + message->m_message->Message() == MSGCODE_START_BOOTLOADER) + { + message->Signal(); + Clear(); + break; + } + } + } + return NULL; +} + void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg) { bool bHandled(false); @@ -282,24 +312,18 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg) m_com->SetLineTimeout(msg->lineTimeout); } - CCECAdapterMessageQueueEntry *entry(NULL); + CCECAdapterMessageQueueEntry *entry = new CCECAdapterMessageQueueEntry(msg); uint64_t iEntryId(0); /* add to the wait for ack queue */ if (msg->Message() != MSGCODE_START_BOOTLOADER) { CLockObject lock(m_mutex); - entry = new CCECAdapterMessageQueueEntry(msg); iEntryId = m_iNextMessage++; m_messages.insert(make_pair(iEntryId, entry)); } - /* TODO write the message async */ - if (!m_com->WriteToDevice(msg)) - { - /* error! */ - Clear(); - return false; - } + /* add the message to the write queue */ + m_writeQueue.Push(entry); bool bReturn(true); if (entry) @@ -311,8 +335,11 @@ bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg) bReturn = false; } - CLockObject lock(m_mutex); - m_messages.erase(iEntryId); + if (msg->Message() != MSGCODE_START_BOOTLOADER) + { + CLockObject lock(m_mutex); + m_messages.erase(iEntryId); + } delete entry; }