4e035007e6423543f39e5158117b4e70b0b7a419
[deb_libcec.git] / src / lib / adapter / Pulse-Eight / USBCECAdapterMessageQueue.cpp
1 /*
2 * This file is part of the libCEC(R) library.
3 *
4 * libCEC(R) is Copyright (C) 2011-2012 Pulse-Eight Limited. All rights reserved.
5 * libCEC(R) is an original work, containing original code.
6 *
7 * libCEC(R) is a trademark of Pulse-Eight Limited.
8 *
9 * This program is dual-licensed; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
22 *
23 *
24 * Alternatively, you can license this library under a commercial license,
25 * please contact Pulse-Eight Licensing for more information.
26 *
27 * For more information contact:
28 * Pulse-Eight Licensing <license@pulse-eight.com>
29 * http://www.pulse-eight.com/
30 * http://www.pulse-eight.net/
31 */
32
33 #include "env.h"
34 #include "USBCECAdapterMessageQueue.h"
35
36 #include "USBCECAdapterCommunication.h"
37 #include "USBCECAdapterMessage.h"
38 #include "lib/platform/sockets/socket.h"
39 #include "lib/LibCEC.h"
40 #include "lib/platform/util/StdString.h"
41
42 using namespace CEC;
43 using namespace PLATFORM;
44 using namespace std;
45
46 #define MESSAGE_QUEUE_SIGNAL_WAIT_TIME 1000
47
48 CCECAdapterMessageQueueEntry::CCECAdapterMessageQueueEntry(CCECAdapterMessageQueue *queue, CCECAdapterMessage *message) :
49 m_queue(queue),
50 m_message(message),
51 m_iPacketsLeft(message->IsTranmission() ? message->Size() / 4 : 1),
52 m_bSucceeded(false),
53 m_bWaiting(true) {}
54
55 CCECAdapterMessageQueueEntry::~CCECAdapterMessageQueueEntry(void) { }
56
57 void CCECAdapterMessageQueueEntry::Broadcast(void)
58 {
59 CLockObject lock(m_mutex);
60 m_condition.Broadcast();
61 }
62
63 bool CCECAdapterMessageQueueEntry::MessageReceived(const CCECAdapterMessage &message)
64 {
65 bool bHandled(false);
66
67 if (IsResponse(message))
68 {
69 switch (message.Message())
70 {
71 case MSGCODE_COMMAND_ACCEPTED:
72 bHandled = MessageReceivedCommandAccepted(message);
73 break;
74 case MSGCODE_TRANSMIT_SUCCEEDED:
75 bHandled = MessageReceivedTransmitSucceeded(message);
76 break;
77 default:
78 bHandled = MessageReceivedResponse(message);
79 break;
80 }
81 }
82
83 return bHandled;
84 }
85
86 void CCECAdapterMessageQueueEntry::Signal(void)
87 {
88 CLockObject lock(m_mutex);
89 m_bSucceeded = true;
90 m_condition.Signal();
91 }
92
93 bool CCECAdapterMessageQueueEntry::Wait(uint32_t iTimeout)
94 {
95 bool bReturn(false);
96 /* wait until we receive a signal when the tranmission succeeded */
97 {
98 CLockObject lock(m_mutex);
99 bReturn = m_bSucceeded ? true : m_condition.Wait(m_mutex, m_bSucceeded, iTimeout);
100 m_bWaiting = false;
101 }
102 return bReturn;
103 }
104
105 bool CCECAdapterMessageQueueEntry::IsWaiting(void)
106 {
107 CLockObject lock(m_mutex);
108 return m_bWaiting;
109 }
110
111 cec_adapter_messagecode CCECAdapterMessageQueueEntry::MessageCode(void)
112 {
113 return m_message->Message();
114 }
115
116 bool CCECAdapterMessageQueueEntry::IsResponseOld(const CCECAdapterMessage &msg)
117 {
118 cec_adapter_messagecode msgCode = msg.Message();
119
120 return msgCode == MessageCode() ||
121 msgCode == MSGCODE_COMMAND_ACCEPTED ||
122 msgCode == MSGCODE_COMMAND_REJECTED ||
123 (m_message->IsTranmission() && (msgCode == MSGCODE_TIMEOUT_ERROR ||
124 msgCode == MSGCODE_HIGH_ERROR ||
125 msgCode == MSGCODE_LOW_ERROR ||
126 msgCode == MSGCODE_RECEIVE_FAILED ||
127 msgCode == MSGCODE_TRANSMIT_FAILED_LINE ||
128 msgCode == MSGCODE_TRANSMIT_FAILED_ACK ||
129 msgCode == MSGCODE_TRANSMIT_FAILED_TIMEOUT_DATA ||
130 msgCode == MSGCODE_TRANSMIT_FAILED_TIMEOUT_LINE ||
131 msgCode == MSGCODE_TRANSMIT_SUCCEEDED));
132 }
133
134 bool CCECAdapterMessageQueueEntry::IsResponse(const CCECAdapterMessage &msg)
135 {
136 cec_adapter_messagecode thisMsgCode = m_message->Message();
137 cec_adapter_messagecode msgCode = msg.Message();
138 cec_adapter_messagecode msgResponse = msg.ResponseTo();
139
140 // msgcode matches, always a response
141 if (msgCode == MessageCode())
142 return true;
143
144 if (!ProvidesExtendedResponse())
145 return IsResponseOld(msg);
146
147 // response without a msgcode
148 if (msgResponse == MSGCODE_NOTHING)
149 return false;
150
151 // commands that only repond with accepted/rejected
152 if (thisMsgCode == MSGCODE_PING ||
153 thisMsgCode == MSGCODE_SET_ACK_MASK ||
154 thisMsgCode == MSGCODE_SET_CONTROLLED ||
155 thisMsgCode == MSGCODE_SET_AUTO_ENABLED ||
156 thisMsgCode == MSGCODE_SET_DEFAULT_LOGICAL_ADDRESS ||
157 thisMsgCode == MSGCODE_SET_LOGICAL_ADDRESS_MASK ||
158 thisMsgCode == MSGCODE_SET_PHYSICAL_ADDRESS ||
159 thisMsgCode == MSGCODE_SET_DEVICE_TYPE ||
160 thisMsgCode == MSGCODE_SET_HDMI_VERSION ||
161 thisMsgCode == MSGCODE_SET_OSD_NAME ||
162 thisMsgCode == MSGCODE_WRITE_EEPROM ||
163 thisMsgCode == MSGCODE_TRANSMIT_IDLETIME)
164 return thisMsgCode == msgResponse;
165
166 if (!m_message->IsTranmission())
167 {
168 m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_WARNING, "FIXME! not a transmission");
169 return false;
170 }
171
172 return ((msgCode == MSGCODE_COMMAND_ACCEPTED || msgCode == MSGCODE_COMMAND_REJECTED) &&
173 (msgResponse == MSGCODE_TRANSMIT_ACK_POLARITY || msgResponse == MSGCODE_TRANSMIT || msgResponse == MSGCODE_TRANSMIT_EOM)) ||
174 msgCode == MSGCODE_TIMEOUT_ERROR ||
175 msgCode == MSGCODE_RECEIVE_FAILED ||
176 msgCode == MSGCODE_TRANSMIT_FAILED_ACK ||
177 msgCode == MSGCODE_TRANSMIT_FAILED_TIMEOUT_DATA ||
178 msgCode == MSGCODE_TRANSMIT_FAILED_TIMEOUT_LINE ||
179 msgCode == MSGCODE_TRANSMIT_SUCCEEDED;
180 }
181
182 const char *CCECAdapterMessageQueueEntry::ToString(void) const
183 {
184 /* CEC transmissions got the 'set ack polarity' msgcode, which doesn't look nice */
185 if (m_message->IsTranmission())
186 return "CEC transmission";
187 else
188 return CCECAdapterMessage::ToString(m_message->Message());
189 }
190
191 bool CCECAdapterMessageQueueEntry::MessageReceivedCommandAccepted(const CCECAdapterMessage &message)
192 {
193 bool bSendSignal(false);
194 bool bHandled(false);
195 {
196 CLockObject lock(m_mutex);
197 if (m_iPacketsLeft > 0)
198 {
199 /* decrease by 1 */
200 m_iPacketsLeft--;
201
202 /* log this message */
203 CStdString strLog;
204 strLog.Format("%s - command accepted", ToString());
205 if (m_iPacketsLeft > 0)
206 strLog.AppendFormat(" - waiting for %d more", m_iPacketsLeft);
207 m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, strLog);
208
209 /* no more packets left and not a transmission, so we're done */
210 if (!m_message->IsTranmission() && m_iPacketsLeft == 0)
211 {
212 m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
213 m_message->response = message.packet;
214 bSendSignal = true;
215 }
216 bHandled = true;
217 }
218 }
219
220 if (bSendSignal)
221 Signal();
222
223 return bHandled;
224 }
225
226 bool CCECAdapterMessageQueueEntry::MessageReceivedTransmitSucceeded(const CCECAdapterMessage &message)
227 {
228 {
229 CLockObject lock(m_mutex);
230 if (m_iPacketsLeft == 0)
231 {
232 /* transmission succeeded, so we're done */
233 m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "%s - transmit succeeded", ToString());
234 m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
235 m_message->response = message.packet;
236 }
237 else
238 {
239 /* error, we expected more acks
240 since the messages are processed in order, this should not happen, so this is an error situation */
241 m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_WARNING, "%s - received 'transmit succeeded' but not enough 'command accepted' messages (%d left)", ToString(), m_iPacketsLeft);
242 m_message->state = ADAPTER_MESSAGE_STATE_ERROR;
243 }
244 }
245
246 Signal();
247
248 return true;
249 }
250
251 bool CCECAdapterMessageQueueEntry::MessageReceivedResponse(const CCECAdapterMessage &message)
252 {
253 {
254 CLockObject lock(m_mutex);
255 m_queue->m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "%s - received response - %s", ToString(), message.ToString().c_str());
256 m_message->response = message.packet;
257 if (m_message->IsTranmission())
258 m_message->state = message.Message() == MSGCODE_TRANSMIT_SUCCEEDED ? ADAPTER_MESSAGE_STATE_SENT_ACKED : ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
259 else
260 m_message->state = ADAPTER_MESSAGE_STATE_SENT_ACKED;
261 }
262
263 Signal();
264
265 return true;
266 }
267
268 bool CCECAdapterMessageQueueEntry::ProvidesExtendedResponse(void)
269 {
270 return m_queue && m_queue->ProvidesExtendedResponse();
271 }
272
273 CCECAdapterMessageQueue::CCECAdapterMessageQueue(CUSBCECAdapterCommunication *com) :
274 PLATFORM::CThread(),
275 m_com(com),
276 m_iNextMessage(0)
277 {
278 m_incomingAdapterMessage = new CCECAdapterMessage;
279 m_currentCECFrame.Clear();
280 }
281
282 CCECAdapterMessageQueue::~CCECAdapterMessageQueue(void)
283 {
284 Clear();
285 StopThread(0);
286 delete m_incomingAdapterMessage;
287 }
288
289 void CCECAdapterMessageQueue::Clear(void)
290 {
291 StopThread(5);
292 CLockObject lock(m_mutex);
293 m_writeQueue.Clear();
294 m_messages.clear();
295 }
296
297 void *CCECAdapterMessageQueue::Process(void)
298 {
299 CCECAdapterMessageQueueEntry *message(NULL);
300 while (!IsStopped())
301 {
302 /* wait for a new message */
303 if (m_writeQueue.Pop(message, MESSAGE_QUEUE_SIGNAL_WAIT_TIME) && message)
304 {
305 /* write this message */
306 {
307 CLockObject lock(m_mutex);
308 m_com->WriteToDevice(message->m_message);
309 }
310 if (message->m_message->state == ADAPTER_MESSAGE_STATE_ERROR ||
311 message->m_message->Message() == MSGCODE_START_BOOTLOADER)
312 {
313 message->Signal();
314 Clear();
315 break;
316 }
317 }
318 }
319 return NULL;
320 }
321
322 void CCECAdapterMessageQueue::MessageReceived(const CCECAdapterMessage &msg)
323 {
324 bool bHandled(false);
325 CLockObject lock(m_mutex);
326 /* send the received message to each entry in the queue until it is handled */
327 for (map<uint64_t, CCECAdapterMessageQueueEntry *>::iterator it = m_messages.begin(); !bHandled && it != m_messages.end(); it++)
328 bHandled = it->second->MessageReceived(msg);
329
330 if (!bHandled)
331 {
332 /* the message wasn't handled */
333 bool bIsError(m_com->HandlePoll(msg));
334 m_com->m_callback->GetLib()->AddLog(bIsError ? CEC_LOG_WARNING : CEC_LOG_DEBUG, msg.ToString().c_str());
335
336 /* push this message to the current frame */
337 if (!bIsError && msg.PushToCecCommand(m_currentCECFrame))
338 {
339 /* and push the current frame back over the callback method when a full command was received */
340 if (m_com->IsInitialised())
341 m_com->m_callback->OnCommandReceived(m_currentCECFrame);
342
343 /* clear the current frame */
344 m_currentCECFrame.Clear();
345 }
346 }
347 }
348
349 void CCECAdapterMessageQueue::AddData(uint8_t *data, size_t iLen)
350 {
351 for (size_t iPtr = 0; iPtr < iLen; iPtr++)
352 {
353 bool bFullMessage(false);
354 {
355 CLockObject lock(m_mutex);
356 bFullMessage = m_incomingAdapterMessage->PushReceivedByte(data[iPtr]);
357 }
358
359 if (bFullMessage)
360 {
361 /* a full message was received */
362 CCECAdapterMessage newMessage;
363 newMessage.packet = m_incomingAdapterMessage->packet;
364 MessageReceived(newMessage);
365
366 /* clear the current message */
367 CLockObject lock(m_mutex);
368 m_incomingAdapterMessage->Clear();
369 }
370 }
371 }
372
373 bool CCECAdapterMessageQueue::Write(CCECAdapterMessage *msg)
374 {
375 msg->state = ADAPTER_MESSAGE_STATE_WAITING_TO_BE_SENT;
376
377 /* set the correct line timeout */
378 if (msg->IsTranmission())
379 {
380 m_com->SetLineTimeout(msg->lineTimeout);
381 }
382
383 CCECAdapterMessageQueueEntry *entry = new CCECAdapterMessageQueueEntry(this, msg);
384 uint64_t iEntryId(0);
385 /* add to the wait for ack queue */
386 if (msg->Message() != MSGCODE_START_BOOTLOADER)
387 {
388 CLockObject lock(m_mutex);
389 iEntryId = m_iNextMessage++;
390 m_messages.insert(make_pair(iEntryId, entry));
391 }
392
393 /* add the message to the write queue */
394 m_writeQueue.Push(entry);
395
396 bool bReturn(true);
397 if (entry)
398 {
399 if (!entry->Wait(msg->transmit_timeout <= 5 ? CEC_DEFAULT_TRANSMIT_WAIT : msg->transmit_timeout))
400 {
401 m_com->m_callback->GetLib()->AddLog(CEC_LOG_DEBUG, "command '%s' was not acked by the controller", CCECAdapterMessage::ToString(msg->Message()));
402 msg->state = ADAPTER_MESSAGE_STATE_SENT_NOT_ACKED;
403 bReturn = false;
404 }
405
406 if (msg->Message() != MSGCODE_START_BOOTLOADER)
407 {
408 CLockObject lock(m_mutex);
409 m_messages.erase(iEntryId);
410 }
411
412 if (msg->ReplyIsError())
413 msg->state = ADAPTER_MESSAGE_STATE_ERROR;
414
415 delete entry;
416 }
417
418 return bReturn;
419 }
420
421 bool CCECAdapterMessageQueue::ProvidesExtendedResponse(void)
422 {
423 return m_com && m_com->ProvidesExtendedResponse();
424 }