From a8f0bd18be9ae8f70822b0b22038f40d12b4fcad Mon Sep 17 00:00:00 2001 From: Lars Op den Kamp Date: Mon, 3 Oct 2011 01:29:58 +0200 Subject: [PATCH] cec: created a separate reader thread and fixed the 'lock timeout' bug --- src/lib/CECParser.cpp | 229 +++++-------------------------- src/lib/CECParser.h | 9 +- src/lib/Communication.cpp | 275 ++++++++++++++++++++++++++++++++++++++ src/lib/Communication.h | 77 +++++++++++ src/lib/Makefile.am | 2 + src/lib/util/threads.cpp | 13 +- src/lib/util/threads.h | 2 + 7 files changed, 408 insertions(+), 199 deletions(-) create mode 100644 src/lib/Communication.cpp create mode 100644 src/lib/Communication.h diff --git a/src/lib/CECParser.cpp b/src/lib/CECParser.cpp index 8273137..5ac1d8e 100644 --- a/src/lib/CECParser.cpp +++ b/src/lib/CECParser.cpp @@ -42,6 +42,7 @@ #include "util/threads.h" #include "util/timeutils.h" #include "CECDetect.h" +#include "Communication.h" using namespace CEC; using namespace std; @@ -53,62 +54,59 @@ using namespace std; */ //@{ CCECParser::CCECParser(const char *strDeviceName, cec_logical_address iLogicalAddress /* = CECDEVICE_PLAYBACKDEVICE1 */, int iPhysicalAddress /* = CEC_DEFAULT_PHYSICAL_ADDRESS*/) : - m_inbuf(NULL), - m_iInbufSize(0), - m_iInbufUsed(0), m_iCurrentButton(CEC_USER_CONTROL_CODE_UNKNOWN), m_physicaladdress(iPhysicalAddress), m_iLogicalAddress(iLogicalAddress), m_strDeviceName(strDeviceName), m_bRunning(false) { - m_serialport = new CSerialPort; + m_communication = new CCommunication(this); } CCECParser::~CCECParser(void) { Close(0); - m_serialport->Close(); - delete m_serialport; + m_communication->Close(); + delete m_communication; } bool CCECParser::Open(const char *strPort, int iTimeoutMs /* = 10000 */) { - bool bReturn(false); + if (!m_communication) + return false; - if (!(bReturn = m_serialport->Open(strPort, 38400))) + if (m_communication->IsOpen()) { - CStdString strError; - strError.Format("error opening serial port '%s': %s", strPort, m_serialport->GetError().c_str()); - AddLog(CEC_LOG_ERROR, strError); - return bReturn; + AddLog(CEC_LOG_ERROR, "connection already open"); + return false; } - //clear any input bytes - uint8_t buff[1024]; - m_serialport->Read(buff, sizeof(buff), CEC_SETTLE_DOWN_TIME); - - if (bReturn) - bReturn = SetLogicalAddress(m_iLogicalAddress); + if (!m_communication->Open(strPort, 38400, iTimeoutMs)) + { + AddLog(CEC_LOG_ERROR, "could not open a connection"); + return false; + } - if (!bReturn) + if (!SetLogicalAddress(m_iLogicalAddress)) { - CStdString strError; - strError.Format("error opening serial port '%s': %s", strPort, m_serialport->GetError().c_str()); - AddLog(CEC_LOG_ERROR, strError); - return bReturn; + AddLog(CEC_LOG_ERROR, "could not set the logical address"); + return false; } - if (bReturn) + if (pthread_create(&m_thread, NULL, (void *(*) (void *))&CCECParser::ThreadHandler, (void *)this) == 0) { m_bRunning = true; - if (pthread_create(&m_thread, NULL, (void *(*) (void *))&CCECParser::ThreadHandler, (void *)this) == 0) - pthread_detach(m_thread); - else - m_bRunning = false; + AddLog(CEC_LOG_DEBUG, "processor thread created"); + pthread_detach(m_thread); + return true; + } + else + { + AddLog(CEC_LOG_ERROR, "could not create a processor thread"); + m_bRunning = false; } - return bReturn; + return false; } bool CCECParser::Close(int iTimeoutMs /* = 2000 */) @@ -141,26 +139,16 @@ bool CCECParser::Process(void) int64_t now = GetTimeMs(); while (m_bRunning) { - { - CLockObject lock(&m_mutex, 1000); - if (lock.IsLocked()) - { - if (!ReadFromDevice(100)) - { - m_bRunning = false; - return false; - } - } - } + cec_frame msg; + while (m_bRunning && m_communication->IsOpen() && m_communication->Read(msg, 500)) + ParseMessage(msg); - //AddLog(CEC_LOG_DEBUG, "processing messages"); - ProcessMessages(); now = GetTimeMs(); CheckKeypressTimeout(now); CCondition::Sleep(50); } - AddLog(CEC_LOG_DEBUG, "reader thread terminated"); + AddLog(CEC_LOG_DEBUG, "processor thread terminated"); m_bRunning = false; m_exitCondition.Signal(); return true; @@ -278,7 +266,7 @@ bool CCECParser::SetInactiveView(void) bool CCECParser::GetNextLogMessage(cec_log_message *message) { - return m_bRunning ? m_logBuffer.Pop(*message) : false; + return m_logBuffer.Pop(*message); } bool CCECParser::GetNextKeypress(cec_keypress *key) @@ -390,21 +378,8 @@ void CCECParser::BroadcastActiveSource(void) bool CCECParser::TransmitFormatted(const cec_frame &data, bool bWaitForAck /* = true */, int64_t iTimeout /* = 2000 */) { - CLockObject lock(&m_mutex, iTimeout); - if (!lock.IsLocked()) - { - AddLog(CEC_LOG_ERROR, "could not get a write lock"); - return false; - } - - if (m_serialport->Write(data) != data.size()) - { - CStdString strError; - strError.Format("error writing to serial port: %s", m_serialport->GetError().c_str()); - AddLog(CEC_LOG_ERROR, strError); + if (!m_communication || m_communication->Write(data) != data.size()) return false; - } - AddLog(CEC_LOG_DEBUG, "command sent"); CCondition::Sleep((int) data.size() * 24 /*data*/ + 5 /*start bit (4.5 ms)*/ + 50 /* to be on the safe side */); if (bWaitForAck && !WaitForAck()) @@ -470,14 +445,8 @@ bool CCECParser::WaitForAck(int64_t iTimeout /* = 1000 */) while (!bGotAck && !bError && (iTimeout <= 0 || iNow < iTargetTime)) { - if (!ReadFromDevice((int) iTimeout)) - { - AddLog(CEC_LOG_ERROR, "failed to read from device"); - return false; - } - cec_frame msg; - while (!bGotAck && !bError && GetMessage(msg, false)) + while (!bGotAck && !bError && m_communication->Read(msg, iTimeout)) { uint8_t iCode = msg[0] & ~(MSGCODE_FRAME_EOM | MSGCODE_FRAME_ACK); @@ -527,119 +496,6 @@ bool CCECParser::WaitForAck(int64_t iTimeout /* = 1000 */) return bGotAck && !bError; } -bool CCECParser::ReadFromDevice(int iTimeout) -{ - uint8_t buff[1024]; - int iBytesRead = m_serialport->Read(buff, sizeof(buff), iTimeout); - if (iBytesRead < 0) - { - CStdString strError; - strError.Format("error reading from serial port: %s", m_serialport->GetError().c_str()); - AddLog(CEC_LOG_ERROR, strError); - return false; - } - else if (iBytesRead > 0) - AddData(buff, iBytesRead); - - return true; -} - -void CCECParser::ProcessMessages(void) -{ - cec_frame msg; - while (m_bRunning && GetMessage(msg)) - ParseMessage(msg); -} - -bool CCECParser::GetMessage(cec_frame &msg, bool bFromBuffer /* = true */) -{ - if (bFromBuffer && m_frameBuffer.Pop(msg)) - return true; - - if (m_iInbufUsed < 1) - return false; - - //search for first start of message - int startpos = -1; - for (int i = 0; i < m_iInbufUsed; i++) - { - if (m_inbuf[i] == MSGSTART) - { - startpos = i; - break; - } - } - - if (startpos == -1) - return false; - - //move anything from the first start of message to the beginning of the buffer - if (startpos > 0) - { - memmove(m_inbuf, m_inbuf + startpos, m_iInbufUsed - startpos); - m_iInbufUsed -= startpos; - } - - if (m_iInbufUsed < 2) - return false; - - //look for end of message - startpos = -1; - int endpos = -1; - for (int i = 1; i < m_iInbufUsed; i++) - { - if (m_inbuf[i] == MSGEND) - { - endpos = i; - break; - } - else if (m_inbuf[i] == MSGSTART) - { - startpos = i; - break; - } - } - - if (startpos > 0) //we found a msgstart before msgend, this is not right, remove - { - AddLog(CEC_LOG_ERROR, "received MSGSTART before MSGEND"); - memmove(m_inbuf, m_inbuf + startpos, m_iInbufUsed - startpos); - m_iInbufUsed -= startpos; - return false; - } - - if (endpos > 0) //found a MSGEND - { - msg.clear(); - bool isesc = false; - for (int i = 1; i < endpos; i++) - { - if (isesc) - { - msg.push_back(m_inbuf[i] + (uint8_t)ESCOFFSET); - isesc = false; - } - else if (m_inbuf[i] == MSGESC) - { - isesc = true; - } - else - { - msg.push_back(m_inbuf[i]); - } - } - - if (endpos + 1 < m_iInbufUsed) - memmove(m_inbuf, m_inbuf + endpos + 1, m_iInbufUsed - endpos - 1); - - m_iInbufUsed -= endpos + 1; - - return true; - } - - return false; -} - void CCECParser::ParseMessage(cec_frame &msg) { if (msg.empty()) @@ -806,18 +662,6 @@ void CCECParser::ParseCurrentFrame(void) } } -void CCECParser::AddData(uint8_t *data, int iLen) -{ - if (iLen + m_iInbufUsed > m_iInbufSize) - { - m_iInbufSize = iLen + m_iInbufUsed; - m_inbuf = (uint8_t*)realloc(m_inbuf, m_iInbufSize); - } - - memcpy(m_inbuf + m_iInbufUsed, data, iLen); - m_iInbufUsed += iLen; -} - void CCECParser::PushEscaped(cec_frame &vec, uint8_t byte) { if (byte >= MSGESC && byte != MSGSTART) @@ -864,10 +708,9 @@ bool CCECParser::SetAckMask(uint16_t iMask) PushEscaped(output, (uint8_t)iMask); output.push_back(MSGEND); - if (m_serialport->Write(output) == -1) + if (m_communication->Write(output) == -1) { - strLog.Format("error writing to serial port: %s", m_serialport->GetError().c_str()); - AddLog(CEC_LOG_ERROR, strLog); + AddLog(CEC_LOG_ERROR, "could not set the ackmask"); return false; } diff --git a/src/lib/CECParser.h b/src/lib/CECParser.h index ef39cfb..d429c92 100644 --- a/src/lib/CECParser.h +++ b/src/lib/CECParser.h @@ -41,6 +41,8 @@ class CSerialPort; namespace CEC { + class CCommunication; + class CCECParser : public ICECDevice { public: @@ -73,6 +75,7 @@ namespace CEC static void *ThreadHandler(CCECParser *parser); bool Process(void); + void AddLog(cec_log_level level, const std::string &strMessage); protected: virtual bool TransmitFormatted(const cec_frame &data, bool bWaitForAck = true, int64_t iTimeout = (int64_t) 2000); virtual void TransmitAbort(cec_logical_address address, cec_opcode opcode, ECecAbortReason reason = CEC_ABORT_REASON_UNRECOGNIZED_OPCODE); @@ -88,7 +91,6 @@ namespace CEC private: void AddKey(void); void AddCommand(cec_logical_address source, cec_logical_address destination, cec_opcode opcode, cec_frame *parameters); - void AddLog(cec_log_level level, const std::string &strMessage); bool WaitForAck(int64_t iTimeout = (int64_t) 1000); bool ReadFromDevice(int iTimeout); void ProcessMessages(void); @@ -101,10 +103,6 @@ namespace CEC void CheckKeypressTimeout(int64_t now); - uint8_t* m_inbuf; - int m_iInbufSize; - int m_iInbufUsed; - CSerialPort * m_serialport; cec_frame m_currentframe; cec_user_control_code m_iCurrentButton; int64_t m_buttontime; @@ -119,5 +117,6 @@ namespace CEC CMutex m_mutex; CCondition m_exitCondition; bool m_bRunning; + CCommunication *m_communication; }; }; diff --git a/src/lib/Communication.cpp b/src/lib/Communication.cpp new file mode 100644 index 0000000..f000046 --- /dev/null +++ b/src/lib/Communication.cpp @@ -0,0 +1,275 @@ +/* + * This file is part of the libCEC(R) library. + * + * libCEC(R) is Copyright (C) 2011 Pulse-Eight Limited. All rights reserved. + * libCEC(R) is an original work, containing original code. + * + * libCEC(R) is a trademark of Pulse-Eight Limited. + * + * This program is dual-licensed; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * + * Alternatively, you can license this library under a commercial license, + * please contact Pulse-Eight Licensing for more information. + * + * For more information contact: + * Pulse-Eight Licensing + * http://www.pulse-eight.com/ + * http://www.pulse-eight.net/ + */ + +#include "Communication.h" +#include "CECParser.h" +#include "libPlatform/serialport.h" +#include "util/StdString.h" + +using namespace std; +using namespace CEC; + +CCommunication::CCommunication(CCECParser *parser) : + m_parser(parser), + m_inbuf(NULL), + m_iInbufSize(0), + m_iInbufUsed(0), + m_bStarted(false), + m_bStop(false) +{ + m_port = new CSerialPort; +} + +CCommunication::~CCommunication(void) +{ + m_port->Close(); + m_port = NULL; +} + +bool CCommunication::Open(const char *strPort, int iBaudRate /* = 38400 */, int iTimeoutMs /* = 10000 */) +{ + CLockObject lock(&m_commMutex); + if (m_bStarted) + return false; + + if (!m_port->Open(strPort, iBaudRate)) + { + CStdString strError; + strError.Format("error opening serial port '%s': %s", strPort, m_port->GetError().c_str()); + m_parser->AddLog(CEC_LOG_ERROR, strError); + return false; + } + + m_parser->AddLog(CEC_LOG_DEBUG, "connection opened"); + + //clear any input bytes + uint8_t buff[1024]; + m_port->Read(buff, sizeof(buff), 50); + + CCondition::Sleep(CEC_SETTLE_DOWN_TIME); + + m_bStop = false; + m_bStarted = true; + if (pthread_create(&m_thread, NULL, (void *(*) (void *))&CCommunication::ReaderThreadHandler, (void *)this) == 0) + { + m_parser->AddLog(CEC_LOG_DEBUG, "reader thread created"); + pthread_detach(m_thread); + return true; + } + else + { + m_parser->AddLog(CEC_LOG_DEBUG, "could not create a reader thread"); + } + + return false; +} + +void *CCommunication::ReaderThreadHandler(CCommunication *comm) +{ + if (comm) + comm->ReaderProcess(); + + return NULL; +} + +void CCommunication::Close(void) +{ + m_bStop = true; + pthread_join(m_thread, NULL); + m_port->Close(); +} + +void *CCommunication::ReaderProcess(void) +{ + while (!m_bStop) + { + if (!ReadFromDevice(250)) + { + m_bStarted = false; + break; + } + + CCondition::Sleep(50); + } + + m_parser->AddLog(CEC_LOG_DEBUG, "reader thread terminated"); + + CLockObject lock(&m_commMutex); + m_bStarted = false; + return NULL; +} + +bool CCommunication::ReadFromDevice(int iTimeout) +{ + uint8_t buff[1024]; + CLockObject lock(&m_commMutex); + int iBytesRead = m_port->Read(buff, sizeof(buff), iTimeout); + lock.Leave(); + if (iBytesRead < 0) + { + CStdString strError; + strError.Format("error reading from serial port: %s", m_port->GetError().c_str()); + m_parser->AddLog(CEC_LOG_ERROR, strError); + return false; + } + else if (iBytesRead > 0) + AddData(buff, iBytesRead); + + return true; +} + +void CCommunication::AddData(uint8_t *data, int iLen) +{ + CLockObject lock(&m_bufferMutex); + if (iLen + m_iInbufUsed > m_iInbufSize) + { + m_iInbufSize = iLen + m_iInbufUsed; + m_inbuf = (uint8_t*)realloc(m_inbuf, m_iInbufSize); + } + + memcpy(m_inbuf + m_iInbufUsed, data, iLen); + m_iInbufUsed += iLen; + lock.Leave(); + m_condition.Signal(); +} + +bool CCommunication::Write(const cec_frame &data) +{ + CLockObject lock(&m_commMutex); + + if (m_port->Write(data) != data.size()) + { + CStdString strError; + strError.Format("error writing to serial port: %s", m_port->GetError().c_str()); + m_parser->AddLog(CEC_LOG_ERROR, strError); + return false; + } + + m_parser->AddLog(CEC_LOG_DEBUG, "command sent"); + return true; +} + +bool CCommunication::Read(cec_frame &msg, int iTimeout) +{ + CLockObject lock(&m_bufferMutex); + + while (m_iInbufUsed < 1) + m_condition.Wait(&m_bufferMutex, iTimeout); + + if (m_iInbufUsed < 1) + return false; + + //search for first start of message + int startpos = -1; + for (int i = 0; i < m_iInbufUsed; i++) + { + if (m_inbuf[i] == MSGSTART) + { + startpos = i; + break; + } + } + + if (startpos == -1) + return false; + + //move anything from the first start of message to the beginning of the buffer + if (startpos > 0) + { + memmove(m_inbuf, m_inbuf + startpos, m_iInbufUsed - startpos); + m_iInbufUsed -= startpos; + } + + if (m_iInbufUsed < 2) + return false; + + //look for end of message + startpos = -1; + int endpos = -1; + for (int i = 1; i < m_iInbufUsed; i++) + { + if (m_inbuf[i] == MSGEND) + { + endpos = i; + break; + } + else if (m_inbuf[i] == MSGSTART) + { + startpos = i; + break; + } + } + + if (startpos > 0) //we found a msgstart before msgend, this is not right, remove + { + m_parser->AddLog(CEC_LOG_ERROR, "received MSGSTART before MSGEND"); + memmove(m_inbuf, m_inbuf + startpos, m_iInbufUsed - startpos); + m_iInbufUsed -= startpos; + return false; + } + + if (endpos > 0) //found a MSGEND + { + msg.clear(); + bool isesc = false; + for (int i = 1; i < endpos; i++) + { + if (isesc) + { + msg.push_back(m_inbuf[i] + (uint8_t)ESCOFFSET); + isesc = false; + } + else if (m_inbuf[i] == MSGESC) + { + isesc = true; + } + else + { + msg.push_back(m_inbuf[i]); + } + } + + if (endpos + 1 < m_iInbufUsed) + memmove(m_inbuf, m_inbuf + endpos + 1, m_iInbufUsed - endpos - 1); + + m_iInbufUsed -= endpos + 1; + + return true; + } + + return false; +} + +std::string CCommunication::GetError(void) const +{ + return m_port->GetError(); +} diff --git a/src/lib/Communication.h b/src/lib/Communication.h new file mode 100644 index 0000000..82afbb6 --- /dev/null +++ b/src/lib/Communication.h @@ -0,0 +1,77 @@ +#pragma once +/* + * This file is part of the libCEC(R) library. + * + * libCEC(R) is Copyright (C) 2011 Pulse-Eight Limited. All rights reserved. + * libCEC(R) is an original work, containing original code. + * + * libCEC(R) is a trademark of Pulse-Eight Limited. + * + * This program is dual-licensed; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * + * Alternatively, you can license this library under a commercial license, + * please contact Pulse-Eight Licensing for more information. + * + * For more information contact: + * Pulse-Eight Licensing + * http://www.pulse-eight.com/ + * http://www.pulse-eight.net/ + */ + +#include +#include +#include "../../include/CECExports.h" +#include "../../include/CECTypes.h" +#include "util/buffer.h" + +class CSerialPort; + +namespace CEC +{ + class CCECParser; + + class CCommunication + { + public: + CCommunication(CCECParser *parser); + virtual ~CCommunication(); + + bool Open(const char *strPort, int iBaudRate = 38400, int iTimeoutMs = 10000); + bool Read(cec_frame &msg, int iTimeout = 1000); + bool Write(const cec_frame &frame); + void Close(void); + bool IsOpen(void) const { return !m_bStop && m_bStarted; } + std::string GetError(void) const; + + static void *ReaderThreadHandler(CCommunication *reader); + void *ReaderProcess(void); + private: + void AddData(uint8_t *data, int iLen); + bool ReadFromDevice(int iTimeout); + + CSerialPort * m_port; + CCECParser * m_parser; + uint8_t* m_inbuf; + int m_iInbufSize; + int m_iInbufUsed; + bool m_bStarted; + bool m_bStop; + pthread_t m_thread; + CMutex m_commMutex; + CMutex m_bufferMutex; + CCondition m_condition; + }; +}; diff --git a/src/lib/Makefile.am b/src/lib/Makefile.am index ca89d5a..586e1b1 100644 --- a/src/lib/Makefile.am +++ b/src/lib/Makefile.am @@ -13,6 +13,8 @@ libcec_la_SOURCES = CECParser.cpp \ CECParserC.cpp \ CECDetect.cpp \ CECDetect.h \ + Communication.cpp \ + Communication.h \ ../../include/CECExports.h \ ../../include/CECExportsCpp.h \ ../../include/CECExportsC.h \ diff --git a/src/lib/util/threads.cpp b/src/lib/util/threads.cpp index 3e40c2b..0e188c3 100644 --- a/src/lib/util/threads.cpp +++ b/src/lib/util/threads.cpp @@ -82,10 +82,21 @@ CLockObject::CLockObject(CMutex *mutex, int64_t iTimeout /* = -1 */) : } CLockObject::~CLockObject(void) +{ + Leave(); + m_mutex = NULL; +} + +void CLockObject::Leave(void) { m_mutex->Unlock(); m_bLocked = false; - m_mutex = NULL; +} + +void CLockObject::Lock(void) +{ + m_mutex->Lock(); + m_bLocked = true; } CCondition::CCondition(void) diff --git a/src/lib/util/threads.h b/src/lib/util/threads.h index 41ad2b5..6f6288c 100644 --- a/src/lib/util/threads.h +++ b/src/lib/util/threads.h @@ -73,6 +73,8 @@ public: ~CLockObject(void); bool IsLocked(void) const { return m_bLocked; } + void Leave(void); + void Lock(void); private: CMutex *m_mutex; -- 2.34.1