CLibCEC *instance = CLibCEC::GetInstance();
if (!instance)
return;
- CLockObject lock(instance->m_mutex);
+ CLockObject lock(instance->m_logMutex);
cec_log_message message;
message.level = level;
ICECCallbacks * m_callbacks;
void * m_cbParam;
PLATFORM::CMutex m_mutex;
+ PLATFORM::CMutex m_logMutex;
};
};
m_commands = new CUSBCECAdapterCommands(this);
if (!m_adapterMessageQueue)
+ {
m_adapterMessageQueue = new CCECAdapterMessageQueue(this);
+ m_adapterMessageQueue->CreateThread();
+ }
/* try to open the connection */
CStdString strError;
SetControlledMode(false);
}
+ m_adapterMessageQueue->Clear();
+
/* stop and delete the ping thread */
if (m_pingThread)
m_pingThread->StopThread(0);
/* send the message */
bRetry = (!m_adapterMessageQueue->Write(output) || output->NeedsRetry()) && output->transmit_timeout > 0;
- if (output->state == ADAPTER_MESSAGE_STATE_ERROR)
- Close();
- else if (bRetry)
+ if (bRetry)
Sleep(CEC_DEFAULT_TRANSMIT_RETRY_WAIT);
retVal = output->state;
{
CLibCEC::AddLog(CEC_LOG_DEBUG, "error writing command '%s' to serial port '%s': %s", CCECAdapterMessage::ToString(message->Message()), m_port->GetName().c_str(), m_port->GetError().c_str());
message->state = ADAPTER_MESSAGE_STATE_ERROR;
+ Close();
return false;
}
CLockObject lock(m_mutex);
if (!m_port || !m_port->IsOpen())
return false;
+
iBytesRead = m_port->Read(buff, sizeof(uint8_t) * iSize, iTimeout);
+
+ if (m_port->GetErrorNumber())
+ {
+ CLibCEC::AddLog(CEC_LOG_ERROR, "error reading from serial port: %s", m_port->GetError().c_str());
+ m_port->Close();
+ return false;
+ }
}
if (iBytesRead < 0 || iBytesRead > 256)
- {
- CLibCEC::AddLog(CEC_LOG_ERROR, "error reading from serial port: %s", m_port->GetError().c_str());
- Close();
return false;
- }
else if (iBytesRead > 0)
{
/* add the data to the current frame */
CCECAdapterMessageQueue::~CCECAdapterMessageQueue(void)
{
Clear();
+ StopThread(0);
}
void CCECAdapterMessageQueue::Clear(void)
{
+ StopThread(5);
CLockObject lock(m_mutex);
+ m_writeQueue.Clear();
m_messages.clear();
}
+void *CCECAdapterMessageQueue::Process(void)
+{
+ CCECAdapterMessageQueueEntry *message(NULL);
+ while (!IsStopped())
+ {
+ /* wait for a new message */
+ if (m_writeQueue.Pop(message, 1000))
+ {
+ /* write this message */
+ m_com->WriteToDevice(message->m_message);
+ if (message->m_message->state == ADAPTER_MESSAGE_STATE_ERROR)
+ {
+ message->Signal();
+ Clear();
+ break;
+ }
+ }
+ }
+ return NULL;
+}
+
void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg)
{
bool bHandled(false);
m_messages.insert(make_pair(iEntryId, entry));
}
- /* TODO write the message async */
- if (!m_com->WriteToDevice(msg))
- {
- /* error! */
- Clear();
- return false;
- }
+ /* add the message to the write queue */
+ m_writeQueue.Push(entry);
bool bReturn(true);
if (entry)
*/
#include "USBCECAdapterMessage.h"
+#include "../platform/threads/threads.h"
#include <map>
namespace CEC
*/
const char *ToString(void) const;
- private:
/*!
* @brief Called when a 'command accepted' message was received.
* @param message The message that was received.
PLATFORM::CMutex m_mutex; /**< mutex for changes to this class */
};
- class CCECAdapterMessageQueue
+ class CCECAdapterMessageQueue : public PLATFORM::CThread
{
friend class CUSBCECAdapterCommunication;
* @param iQueueSize The outgoing message queue size.
*/
CCECAdapterMessageQueue(CUSBCECAdapterCommunication *com) :
+ PLATFORM::CThread(),
m_com(com),
m_iNextMessage(0) {}
virtual ~CCECAdapterMessageQueue(void);
*/
bool Write(CCECAdapterMessage *msg);
+ virtual void *Process(void);
+
private:
CUSBCECAdapterCommunication * m_com; /**< the communication handler */
PLATFORM::CMutex m_mutex; /**< mutex for changes to this class */
std::map<uint64_t, CCECAdapterMessageQueueEntry *> m_messages; /**< the outgoing message queue */
+ PLATFORM::SyncedBuffer<CCECAdapterMessageQueueEntry *> m_writeQueue; /**< the queue for messages that are to be written */
uint64_t m_iNextMessage; /**< the index of the next message */
CCECAdapterMessage m_incomingAdapterMessage; /**< the current incoming message that's being assembled */
cec_command m_currentCECFrame; /**< the current incoming CEC command that's being assembled */
{
public:
SyncedBuffer(size_t iMaxSize = 100) :
- m_maxSize(iMaxSize) {}
+ m_maxSize(iMaxSize),
+ m_bHasMessages(false) {}
virtual ~SyncedBuffer(void)
{
CLockObject lock(m_mutex);
while (!m_buffer.empty())
m_buffer.pop();
+ m_condition.Broadcast();
}
size_t Size(void)
return false;
m_buffer.push(entry);
+ m_bHasMessages = true;
+ m_condition.Signal();
return true;
}
- bool Pop(_BType &entry)
+ bool Pop(_BType &entry, uint32_t iTimeoutMs = 0)
{
bool bReturn(false);
CLockObject lock(m_mutex);
+
+ // wait for a signal if the buffer is empty
+ if (m_buffer.empty() && iTimeoutMs > 0)
+ {
+ if (!m_condition.Wait(m_mutex, m_bHasMessages, iTimeoutMs))
+ return bReturn;
+ }
+
+ // pop the first item
if (!m_buffer.empty())
{
entry = m_buffer.front();
m_buffer.pop();
+ m_bHasMessages = !m_buffer.empty();
bReturn = true;
}
return bReturn;
size_t m_maxSize;
std::queue<_BType> m_buffer;
CMutex m_mutex;
+ CCondition<bool> m_condition;
+ bool m_bHasMessages;
};
};