2 * This file is part of the libCEC(R) library.
4 * libCEC(R) is Copyright (C) 2011-2012 Pulse-Eight Limited. All rights reserved.
5 * libCEC(R) is an original work, containing original code.
7 * libCEC(R) is a trademark of Pulse-Eight Limited.
9 * This program is dual-licensed; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
24 * Alternatively, you can license this library under a commercial license,
25 * please contact Pulse-Eight Licensing for more information.
27 * For more information contact:
28 * Pulse-Eight Licensing <license@pulse-eight.com>
29 * http://www.pulse-eight.com/
30 * http://www.pulse-eight.net/
34 #include "USBCECAdapterMessageQueue.h"
36 #include "USBCECAdapterCommunication.h"
37 #include "USBCECAdapterMessage.h"
38 #include "lib/platform/sockets/socket.h"
39 #include "lib/LibCEC.h"
40 #include "lib/platform/util/StdString.h"
43 using namespace PLATFORM
;
46 #define MESSAGE_QUEUE_SIGNAL_WAIT_TIME 1000
48 CCECAdapterMessageQueueEntry::CCECAdapterMessageQueueEntry(CCECAdapterMessageQueue
*queue
, CCECAdapterMessage
*message
) :
51 m_iPacketsLeft(message
->IsTranmission() ? message
->Size() / 4 : 1),
54 m_queueTimeout(message
->transmit_timeout
) {}
56 CCECAdapterMessageQueueEntry::~CCECAdapterMessageQueueEntry(void) { }
58 void CCECAdapterMessageQueueEntry::Broadcast(void)
60 CLockObject
lock(m_mutex
);
61 m_condition
.Broadcast();
64 bool CCECAdapterMessageQueueEntry::MessageReceived(const CCECAdapterMessage
&message
)
68 if (IsResponse(message
))
70 switch (message
.Message())
72 case MSGCODE_COMMAND_ACCEPTED
:
73 bHandled
= MessageReceivedCommandAccepted(message
);
75 case MSGCODE_TRANSMIT_SUCCEEDED
:
76 bHandled
= MessageReceivedTransmitSucceeded(message
);
79 bHandled
= MessageReceivedResponse(message
);
87 void CCECAdapterMessageQueueEntry::Signal(void)
89 CLockObject
lock(m_mutex
);
94 bool CCECAdapterMessageQueueEntry::Wait(uint32_t iTimeout
)
97 /* wait until we receive a signal when the tranmission succeeded */
99 CLockObject
lock(m_mutex
);
100 bReturn
= m_bSucceeded
? true : m_condition
.Wait(m_mutex
, m_bSucceeded
, iTimeout
);
106 bool CCECAdapterMessageQueueEntry::IsWaiting(void)
108 CLockObject
lock(m_mutex
);
112 cec_adapter_messagecode
CCECAdapterMessageQueueEntry::MessageCode(void)
114 return m_message
->Message();
117 bool CCECAdapterMessageQueueEntry::IsResponseOld(const CCECAdapterMessage
&msg
)
119 cec_adapter_messagecode msgCode
= msg
.Message();
121 return msgCode
== MessageCode() ||
122 msgCode
== MSGCODE_COMMAND_ACCEPTED
||
123 msgCode
== MSGCODE_COMMAND_REJECTED
||
124 (m_message
->IsTranmission() && (msgCode
== MSGCODE_TIMEOUT_ERROR
||
125 msgCode
== MSGCODE_HIGH_ERROR
||
126 msgCode
== MSGCODE_LOW_ERROR
||
127 msgCode
== MSGCODE_RECEIVE_FAILED
||
128 msgCode
== MSGCODE_TRANSMIT_FAILED_LINE
||
129 msgCode
== MSGCODE_TRANSMIT_FAILED_ACK
||
130 msgCode
== MSGCODE_TRANSMIT_FAILED_TIMEOUT_DATA
||
131 msgCode
== MSGCODE_TRANSMIT_FAILED_TIMEOUT_LINE
||
132 msgCode
== MSGCODE_TRANSMIT_SUCCEEDED
));
135 bool CCECAdapterMessageQueueEntry::IsResponse(const CCECAdapterMessage
&msg
)
137 if (m_message
->state
== ADAPTER_MESSAGE_STATE_SENT_ACKED
)
140 cec_adapter_messagecode thisMsgCode
= m_message
->Message();
141 cec_adapter_messagecode msgCode
= msg
.Message();
142 cec_adapter_messagecode msgResponse
= msg
.ResponseTo();
144 // msgcode matches, always a response
145 if (msgCode
== MessageCode())
148 if (!ProvidesExtendedResponse())
149 return IsResponseOld(msg
);
151 // response without a msgcode
152 if (msgResponse
== MSGCODE_NOTHING
)
155 // commands that only repond with accepted/rejected
156 if (thisMsgCode
== MSGCODE_PING
||
157 thisMsgCode
== MSGCODE_SET_ACK_MASK
||
158 thisMsgCode
== MSGCODE_SET_CONTROLLED
||
159 thisMsgCode
== MSGCODE_SET_AUTO_ENABLED
||
160 thisMsgCode
== MSGCODE_SET_DEFAULT_LOGICAL_ADDRESS
||
161 thisMsgCode
== MSGCODE_SET_LOGICAL_ADDRESS_MASK
||
162 thisMsgCode
== MSGCODE_SET_PHYSICAL_ADDRESS
||
163 thisMsgCode
== MSGCODE_SET_DEVICE_TYPE
||
164 thisMsgCode
== MSGCODE_SET_HDMI_VERSION
||
165 thisMsgCode
== MSGCODE_SET_OSD_NAME
||
166 thisMsgCode
== MSGCODE_WRITE_EEPROM
||
167 thisMsgCode
== MSGCODE_TRANSMIT_IDLETIME
||
168 thisMsgCode
== MSGCODE_SET_ACTIVE_SOURCE
)
169 return thisMsgCode
== msgResponse
;
171 if (!m_message
->IsTranmission())
173 m_queue
->m_com
->m_callback
->GetLib()->AddLog(CEC_LOG_WARNING
, "FIXME! not a transmission: %s", msg
.ToString().c_str());
177 return ((msgCode
== MSGCODE_COMMAND_ACCEPTED
|| msgCode
== MSGCODE_COMMAND_REJECTED
) &&
178 (msgResponse
== MSGCODE_TRANSMIT_ACK_POLARITY
|| msgResponse
== MSGCODE_TRANSMIT
|| msgResponse
== MSGCODE_TRANSMIT_EOM
)) ||
179 msgCode
== MSGCODE_TIMEOUT_ERROR
||
180 msgCode
== MSGCODE_RECEIVE_FAILED
||
181 msgCode
== MSGCODE_TRANSMIT_FAILED_ACK
||
182 msgCode
== MSGCODE_TRANSMIT_FAILED_TIMEOUT_DATA
||
183 msgCode
== MSGCODE_TRANSMIT_FAILED_TIMEOUT_LINE
||
184 msgCode
== MSGCODE_TRANSMIT_SUCCEEDED
;
187 const char *CCECAdapterMessageQueueEntry::ToString(void) const
189 /* CEC transmissions got the 'set ack polarity' msgcode, which doesn't look nice */
190 if (m_message
->IsTranmission())
191 return "CEC transmission";
193 return CCECAdapterMessage::ToString(m_message
->Message());
196 bool CCECAdapterMessageQueueEntry::MessageReceivedCommandAccepted(const CCECAdapterMessage
&message
)
198 bool bSendSignal(false);
199 bool bHandled(false);
201 CLockObject
lock(m_mutex
);
202 if (m_iPacketsLeft
> 0)
208 /* log this message */
210 strLog
.Format("%s - command accepted", ToString());
211 if (m_iPacketsLeft
> 0)
212 strLog
.AppendFormat(" - waiting for %d more", m_iPacketsLeft
);
213 m_queue
->m_com
->m_callback
->GetLib()->AddLog(CEC_LOG_DEBUG
, strLog
);
216 /* no more packets left and not a transmission, so we're done */
217 if (!m_message
->IsTranmission() && m_iPacketsLeft
== 0)
219 m_message
->state
= ADAPTER_MESSAGE_STATE_SENT_ACKED
;
220 m_message
->response
= message
.packet
;
233 bool CCECAdapterMessageQueueEntry::MessageReceivedTransmitSucceeded(const CCECAdapterMessage
&message
)
236 CLockObject
lock(m_mutex
);
237 if (m_iPacketsLeft
== 0)
239 /* transmission succeeded, so we're done */
241 m_queue
->m_com
->m_callback
->GetLib()->AddLog(CEC_LOG_DEBUG
, "%s - transmit succeeded", m_message
->ToString().c_str());
243 m_message
->state
= ADAPTER_MESSAGE_STATE_SENT_ACKED
;
244 m_message
->response
= message
.packet
;
248 /* error, we expected more acks
249 since the messages are processed in order, this should not happen, so this is an error situation */
250 m_queue
->m_com
->m_callback
->GetLib()->AddLog(CEC_LOG_WARNING
, "%s - received 'transmit succeeded' but not enough 'command accepted' messages (%d left)", ToString(), m_iPacketsLeft
);
251 m_message
->state
= ADAPTER_MESSAGE_STATE_ERROR
;
260 bool CCECAdapterMessageQueueEntry::MessageReceivedResponse(const CCECAdapterMessage
&message
)
263 CLockObject
lock(m_mutex
);
265 m_queue
->m_com
->m_callback
->GetLib()->AddLog(CEC_LOG_DEBUG
, "%s - received response - %s", ToString(), message
.ToString().c_str());
267 if (message
.IsError())
268 m_queue
->m_com
->m_callback
->GetLib()->AddLog(CEC_LOG_DEBUG
, "%s - received response - %s", ToString(), message
.ToString().c_str());
270 m_message
->response
= message
.packet
;
271 if (m_message
->IsTranmission())
272 m_message
->state
= message
.Message() == MSGCODE_TRANSMIT_SUCCEEDED
? ADAPTER_MESSAGE_STATE_SENT_ACKED
: ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED
;
274 m_message
->state
= ADAPTER_MESSAGE_STATE_SENT_ACKED
;
282 bool CCECAdapterMessageQueueEntry::ProvidesExtendedResponse(void)
284 return m_queue
&& m_queue
->ProvidesExtendedResponse();
287 bool CCECAdapterMessageQueueEntry::TimedOutOrSucceeded(void) const
289 return m_message
->bFireAndForget
&& (m_bSucceeded
|| m_queueTimeout
.TimeLeft() == 0);
292 CCECAdapterMessageQueue::CCECAdapterMessageQueue(CUSBCECAdapterCommunication
*com
) :
297 m_incomingAdapterMessage
= new CCECAdapterMessage
;
298 m_currentCECFrame
.Clear();
301 CCECAdapterMessageQueue::~CCECAdapterMessageQueue(void)
306 delete m_incomingAdapterMessage
;
309 void CCECAdapterMessageQueue::Clear(void)
312 CLockObject
lock(m_mutex
);
313 m_writeQueue
.Clear();
317 void *CCECAdapterMessageQueue::Process(void)
319 CCECAdapterMessageQueueEntry
*message(NULL
);
322 /* wait for a new message */
323 if (m_writeQueue
.Pop(message
, MESSAGE_QUEUE_SIGNAL_WAIT_TIME
) && message
)
325 /* write this message */
327 CLockObject
lock(m_mutex
);
328 m_com
->WriteToDevice(message
->m_message
);
330 if (message
->m_message
->state
== ADAPTER_MESSAGE_STATE_ERROR
||
331 message
->m_message
->Message() == MSGCODE_START_BOOTLOADER
)
339 CheckTimedOutMessages();
344 void CCECAdapterMessageQueue::CheckTimedOutMessages(void)
346 CLockObject
lock(m_mutex
);
347 vector
<uint64_t> timedOut
;
348 for (map
<uint64_t, CCECAdapterMessageQueueEntry
*>::iterator it
= m_messages
.begin(); it
!= m_messages
.end(); it
++)
350 if (it
->second
->TimedOutOrSucceeded())
352 timedOut
.push_back(it
->first
);
353 if (!it
->second
->m_bSucceeded
)
354 m_com
->m_callback
->GetLib()->AddLog(CEC_LOG_DEBUG
, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(it
->second
->m_message
->Message()));
355 delete it
->second
->m_message
;
360 for (vector
<uint64_t>::iterator it
= timedOut
.begin(); it
!= timedOut
.end(); it
++)
362 uint64_t iEntryId
= *it
;
363 m_messages
.erase(iEntryId
);
367 void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage
&msg
)
369 bool bHandled(false);
370 CLockObject
lock(m_mutex
);
371 /* send the received message to each entry in the queue until it is handled */
372 for (map
<uint64_t, CCECAdapterMessageQueueEntry
*>::iterator it
= m_messages
.begin(); !bHandled
&& it
!= m_messages
.end(); it
++)
373 bHandled
= it
->second
->MessageReceived(msg
);
377 /* the message wasn't handled */
378 bool bIsError(m_com
->HandlePoll(msg
));
380 m_com
->m_callback
->GetLib()->AddLog(bIsError
? CEC_LOG_WARNING
: CEC_LOG_DEBUG
, msg
.ToString().c_str());
383 m_com
->m_callback
->GetLib()->AddLog(CEC_LOG_WARNING
, msg
.ToString().c_str());
386 /* push this message to the current frame */
387 if (!bIsError
&& msg
.PushToCecCommand(m_currentCECFrame
))
389 /* and push the current frame back over the callback method when a full command was received */
390 if (m_com
->IsInitialised())
391 m_com
->m_callback
->OnCommandReceived(m_currentCECFrame
);
393 /* clear the current frame */
394 m_currentCECFrame
.Clear();
399 void CCECAdapterMessageQueue::AddData(uint8_t *data
, size_t iLen
)
401 for (size_t iPtr
= 0; iPtr
< iLen
; iPtr
++)
403 bool bFullMessage(false);
405 CLockObject
lock(m_mutex
);
406 bFullMessage
= m_incomingAdapterMessage
->PushReceivedByte(data
[iPtr
]);
411 /* a full message was received */
412 CCECAdapterMessage newMessage
;
413 newMessage
.packet
= m_incomingAdapterMessage
->packet
;
414 MessageReceived(newMessage
);
416 /* clear the current message */
417 CLockObject
lock(m_mutex
);
418 m_incomingAdapterMessage
->Clear();
423 bool CCECAdapterMessageQueue::Write(CCECAdapterMessage
*msg
)
425 msg
->state
= ADAPTER_MESSAGE_STATE_WAITING_TO_BE_SENT
;
427 /* set the correct line timeout */
428 if (msg
->IsTranmission())
430 m_com
->SetLineTimeout(msg
->lineTimeout
);
433 CCECAdapterMessageQueueEntry
*entry
= new CCECAdapterMessageQueueEntry(this, msg
);
436 m_com
->m_callback
->GetLib()->AddLog(CEC_LOG_ERROR
, "couldn't create queue entry for '%s'", CCECAdapterMessage::ToString(msg
->Message()));
437 msg
->state
= ADAPTER_MESSAGE_STATE_ERROR
;
441 uint64_t iEntryId(0);
442 /* add to the wait for ack queue */
443 if (msg
->Message() != MSGCODE_START_BOOTLOADER
)
445 CLockObject
lock(m_mutex
);
446 iEntryId
= m_iNextMessage
++;
447 m_messages
.insert(make_pair(iEntryId
, entry
));
450 /* add the message to the write queue */
451 m_writeQueue
.Push(entry
);
454 if (!msg
->bFireAndForget
)
456 if (!entry
->Wait(msg
->transmit_timeout
<= 5 ? CEC_DEFAULT_TRANSMIT_WAIT
: msg
->transmit_timeout
))
458 m_com
->m_callback
->GetLib()->AddLog(CEC_LOG_DEBUG
, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(msg
->Message()));
459 msg
->state
= ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED
;
463 if (msg
->Message() != MSGCODE_START_BOOTLOADER
)
465 CLockObject
lock(m_mutex
);
466 m_messages
.erase(iEntryId
);
469 if (msg
->ReplyIsError() && msg
->state
!= ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED
)
470 msg
->state
= ADAPTER_MESSAGE_STATE_ERROR
;
478 bool CCECAdapterMessageQueue::ProvidesExtendedResponse(void)
480 return m_com
&& m_com
->ProvidesExtendedResponse();