Add support for receiving iTunes streams back, got broken in the process
[deb_shairplay.git] / src / lib / raop_rtp.c
CommitLineData
23e7e3ae
JVH
1/**
2 * Copyright (C) 2011-2012 Juho Vähä-Herttua
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 */
14
2340bcd3
JVH
15#include <stdlib.h>
16#include <stdio.h>
17#include <string.h>
18#include <assert.h>
19#include <errno.h>
20
21#include "raop_rtp.h"
22#include "raop.h"
23#include "raop_buffer.h"
24#include "netutils.h"
ba0970e1 25#include "utils.h"
2340bcd3
JVH
26#include "compat.h"
27#include "logger.h"
28
29#define NO_FLUSH (-42)
30
31struct raop_rtp_s {
32 logger_t *logger;
33 raop_callbacks_t callbacks;
34
ba0970e1 35 /* Buffer to handle all resends */
2340bcd3
JVH
36 raop_buffer_t *buffer;
37
ba0970e1
JVH
38 /* Remote address as sockaddr */
39 struct sockaddr_storage remote_saddr;
40 socklen_t remote_saddr_len;
41
2340bcd3
JVH
42 /* These variables only edited mutex locked */
43 int running;
44 int joined;
45 float volume;
46 int flush;
47 thread_handle_t thread;
48 mutex_handle_t run_mutex;
49
50 /* Remote control and timing ports */
51 unsigned short control_rport;
52 unsigned short timing_rport;
53
54 /* Sockets for control, timing and data */
55 int csock, tsock, dsock;
56
57 /* Local control, timing and data ports */
58 unsigned short control_lport;
59 unsigned short timing_lport;
60 unsigned short data_lport;
61
ba0970e1 62 /* Initialized after the first control packet */
2340bcd3
JVH
63 struct sockaddr_storage control_saddr;
64 socklen_t control_saddr_len;
65 unsigned short control_seqnum;
66};
67
ba0970e1
JVH
68static int
69raop_rtp_parse_remote(raop_rtp_t *raop_rtp, const char *remote)
70{
71 char *original;
72 char *current;
73 char *tmpstr;
74 int family;
75 int ret;
76
77 assert(raop_rtp);
78
79 current = original = strdup(remote);
80 if (!original) {
81 return -1;
82 }
83 tmpstr = utils_strsep(&current, " ");
84 if (strcmp(tmpstr, "IN")) {
85 free(original);
86 return -1;
87 }
88 tmpstr = utils_strsep(&current, " ");
89 if (!strcmp(tmpstr, "IP4") && current) {
90 family = AF_INET;
91 } else if (!strcmp(tmpstr, "IP6") && current) {
92 family = AF_INET6;
93 } else {
94 free(original);
95 return -1;
96 }
e66980f2
JVH
97 if (strstr(current, ":")) {
98 /* FIXME: iTunes sends IP4 even with an IPv6 address, does it mean something */
99 family = AF_INET6;
100 }
ba0970e1
JVH
101 ret = netutils_parse_address(family, current,
102 &raop_rtp->remote_saddr,
103 sizeof(raop_rtp->remote_saddr));
104 if (ret < 0) {
105 free(original);
106 return -1;
107 }
108 raop_rtp->remote_saddr_len = ret;
109 free(original);
110 return 0;
111}
112
2340bcd3 113raop_rtp_t *
ba0970e1
JVH
114raop_rtp_init(logger_t *logger, raop_callbacks_t *callbacks, const char *remote,
115 const char *fmtp, const unsigned char *aeskey, const unsigned char *aesiv)
2340bcd3
JVH
116{
117 raop_rtp_t *raop_rtp;
118
119 assert(logger);
ba0970e1
JVH
120 assert(callbacks);
121 assert(remote);
122 assert(fmtp);
2340bcd3
JVH
123
124 raop_rtp = calloc(1, sizeof(raop_rtp_t));
125 if (!raop_rtp) {
126 return NULL;
127 }
128 raop_rtp->logger = logger;
129 memcpy(&raop_rtp->callbacks, callbacks, sizeof(raop_callbacks_t));
130 raop_rtp->buffer = raop_buffer_init(fmtp, aeskey, aesiv);
131 if (!raop_rtp->buffer) {
132 free(raop_rtp);
133 return NULL;
134 }
ba0970e1
JVH
135 if (raop_rtp_parse_remote(raop_rtp, remote) < 0) {
136 free(raop_rtp);
137 return NULL;
138 }
2340bcd3
JVH
139
140 raop_rtp->running = 0;
141 raop_rtp->joined = 1;
142 raop_rtp->flush = NO_FLUSH;
143 MUTEX_CREATE(raop_rtp->run_mutex);
144
145 return raop_rtp;
146}
147
148void
149raop_rtp_destroy(raop_rtp_t *raop_rtp)
150{
151 if (raop_rtp) {
152 raop_rtp_stop(raop_rtp);
153
154 MUTEX_DESTROY(raop_rtp->run_mutex);
155 raop_buffer_destroy(raop_rtp->buffer);
156 free(raop_rtp);
157 }
158}
159
160static int
161raop_rtp_init_sockets(raop_rtp_t *raop_rtp, int use_ipv6, int use_udp)
162{
163 int csock = -1, tsock = -1, dsock = -1;
164 unsigned short cport = 0, tport = 0, dport = 0;
165
166 assert(raop_rtp);
167
168 if (use_udp) {
169 csock = netutils_init_socket(&cport, use_ipv6, use_udp);
170 tsock = netutils_init_socket(&tport, use_ipv6, use_udp);
171 if (csock == -1 || tsock == -1) {
172 goto sockets_cleanup;
173 }
174 }
175 dsock = netutils_init_socket(&dport, use_ipv6, use_udp);
176 if (dsock == -1) {
177 goto sockets_cleanup;
178 }
179
180 /* Listen to the data socket if using TCP */
181 if (!use_udp) {
182 if (listen(dsock, 1) < 0)
183 goto sockets_cleanup;
184 }
185
186 /* Set socket descriptors */
187 raop_rtp->csock = csock;
188 raop_rtp->tsock = tsock;
189 raop_rtp->dsock = dsock;
190
191 /* Set port values */
192 raop_rtp->control_lport = cport;
193 raop_rtp->timing_lport = tport;
194 raop_rtp->data_lport = dport;
195 return 0;
196
197sockets_cleanup:
198 if (csock != -1) closesocket(csock);
199 if (tsock != -1) closesocket(tsock);
200 if (dsock != -1) closesocket(dsock);
201 return -1;
202}
203
204static int
205raop_rtp_resend_callback(void *opaque, unsigned short seqnum, unsigned short count)
206{
207 raop_rtp_t *raop_rtp = opaque;
208 unsigned char packet[8];
209 unsigned short ourseqnum;
210 struct sockaddr *addr;
211 socklen_t addrlen;
ba0970e1 212 int ret;
2340bcd3
JVH
213
214 addr = (struct sockaddr *)&raop_rtp->control_saddr;
215 addrlen = raop_rtp->control_saddr_len;
216
46212791 217 logger_log(raop_rtp->logger, LOGGER_DEBUG, "Got resend request %d %d", seqnum, count);
2340bcd3
JVH
218 ourseqnum = raop_rtp->control_seqnum++;
219
220 /* Fill the request buffer */
221 packet[0] = 0x80;
222 packet[1] = 0x55|0x80;
223 packet[2] = (ourseqnum >> 8);
224 packet[3] = ourseqnum;
225 packet[4] = (seqnum >> 8);
226 packet[5] = seqnum;
227 packet[6] = (count >> 8);
228 packet[7] = count;
229
ba0970e1
JVH
230 ret = sendto(raop_rtp->csock, (const char *)packet, sizeof(packet), 0, addr, addrlen);
231 if (ret == -1) {
46212791 232 logger_log(raop_rtp->logger, LOGGER_WARNING, "Resend failed: %d", SOCKET_GET_ERROR());
ba0970e1
JVH
233 }
234
2340bcd3
JVH
235 return 0;
236}
237
238static THREAD_RETVAL
239raop_rtp_thread_udp(void *arg)
240{
241 raop_rtp_t *raop_rtp = arg;
242 unsigned char packet[RAOP_PACKET_LEN];
243 unsigned int packetlen;
244 struct sockaddr_storage saddr;
245 socklen_t saddrlen;
aaa2c733 246 float volume = 0.0;
2340bcd3
JVH
247
248 const ALACSpecificConfig *config;
249 void *cb_data = NULL;
250
251 assert(raop_rtp);
252
253 config = raop_buffer_get_config(raop_rtp->buffer);
067f00ef 254 cb_data = raop_rtp->callbacks.audio_init(raop_rtp->callbacks.cls,
2340bcd3
JVH
255 config->bitDepth,
256 config->numChannels,
257 config->sampleRate);
258
259 while(1) {
260 int volume_changed;
2340bcd3
JVH
261 int flush;
262
263 fd_set rfds;
264 struct timeval tv;
265 int nfds, ret;
266
267 MUTEX_LOCK(raop_rtp->run_mutex);
268 if (!raop_rtp->running) {
269 MUTEX_UNLOCK(raop_rtp->run_mutex);
270 break;
271 }
272 /* Read the volume level */
273 volume_changed = (volume != raop_rtp->volume);
274 volume = raop_rtp->volume;
275
276 /* Read the flush value */
277 flush = raop_rtp->flush;
278 raop_rtp->flush = NO_FLUSH;
279 MUTEX_UNLOCK(raop_rtp->run_mutex);
280
281 /* Call set_volume callback if changed */
282 if (volume_changed) {
283 raop_rtp->callbacks.audio_set_volume(raop_rtp->callbacks.cls, cb_data, volume);
284 }
285 if (flush != NO_FLUSH) {
286 raop_buffer_flush(raop_rtp->buffer, flush);
287 raop_rtp->callbacks.audio_flush(raop_rtp->callbacks.cls, cb_data);
288 }
289
290 /* Set timeout value to 5ms */
291 tv.tv_sec = 0;
292 tv.tv_usec = 5000;
293
294 /* Get the correct nfds value */
295 nfds = raop_rtp->csock+1;
296 if (raop_rtp->tsock >= nfds)
297 nfds = raop_rtp->tsock+1;
298 if (raop_rtp->dsock >= nfds)
299 nfds = raop_rtp->dsock+1;
300
301 /* Set rfds and call select */
302 FD_ZERO(&rfds);
303 FD_SET(raop_rtp->csock, &rfds);
304 FD_SET(raop_rtp->tsock, &rfds);
305 FD_SET(raop_rtp->dsock, &rfds);
306 ret = select(nfds, &rfds, NULL, NULL, &tv);
307 if (ret == 0) {
308 /* Timeout happened */
309 continue;
310 } else if (ret == -1) {
311 /* FIXME: Error happened */
312 break;
313 }
314
315 if (FD_ISSET(raop_rtp->csock, &rfds)) {
316 saddrlen = sizeof(saddr);
317 packetlen = recvfrom(raop_rtp->csock, (char *)packet, sizeof(packet), 0,
318 (struct sockaddr *)&saddr, &saddrlen);
319
ba0970e1 320 /* Get the destination address here, because we need the sin6_scope_id */
2340bcd3
JVH
321 memcpy(&raop_rtp->control_saddr, &saddr, saddrlen);
322 raop_rtp->control_saddr_len = saddrlen;
323
324 if (packetlen >= 12) {
325 char type = packet[1] & ~0x80;
326
46212791 327 logger_log(raop_rtp->logger, LOGGER_DEBUG, "Got control packet of type 0x%02x", type);
2340bcd3
JVH
328 if (type == 0x56) {
329 /* Handle resent data packet */
330 int ret = raop_buffer_queue(raop_rtp->buffer, packet+4, packetlen-4, 1);
331 assert(ret >= 0);
332 }
333 }
334 } else if (FD_ISSET(raop_rtp->tsock, &rfds)) {
46212791 335 logger_log(raop_rtp->logger, LOGGER_INFO, "Would have timing packet in queue");
2340bcd3
JVH
336 } else if (FD_ISSET(raop_rtp->dsock, &rfds)) {
337 saddrlen = sizeof(saddr);
338 packetlen = recvfrom(raop_rtp->dsock, (char *)packet, sizeof(packet), 0,
339 (struct sockaddr *)&saddr, &saddrlen);
340 if (packetlen >= 12) {
341 int no_resend = (raop_rtp->control_rport == 0);
342 int ret;
343
344 const void *audiobuf;
345 int audiobuflen;
346
347 ret = raop_buffer_queue(raop_rtp->buffer, packet, packetlen, 1);
348 assert(ret >= 0);
349
350 /* Decode all frames in queue */
351 while ((audiobuf = raop_buffer_dequeue(raop_rtp->buffer, &audiobuflen, no_resend))) {
352 raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, cb_data, audiobuf, audiobuflen);
353 }
354
355 /* Handle possible resend requests */
356 if (!no_resend) {
357 raop_buffer_handle_resends(raop_rtp->buffer, raop_rtp_resend_callback, raop_rtp);
358 }
359 }
360 }
361 }
46212791 362 logger_log(raop_rtp->logger, LOGGER_INFO, "Exiting UDP RAOP thread");
2340bcd3
JVH
363 raop_rtp->callbacks.audio_destroy(raop_rtp->callbacks.cls, cb_data);
364
365 return 0;
366}
367
368static THREAD_RETVAL
369raop_rtp_thread_tcp(void *arg)
370{
371 raop_rtp_t *raop_rtp = arg;
372 int stream_fd = -1;
373 unsigned char packet[RAOP_PACKET_LEN];
374 unsigned int packetlen = 0;
aaa2c733 375 float volume = 0.0;
2340bcd3
JVH
376
377 const ALACSpecificConfig *config;
378 void *cb_data = NULL;
379
380 assert(raop_rtp);
381
382 config = raop_buffer_get_config(raop_rtp->buffer);
067f00ef 383 cb_data = raop_rtp->callbacks.audio_init(raop_rtp->callbacks.cls,
2340bcd3
JVH
384 config->bitDepth,
385 config->numChannels,
386 config->sampleRate);
387
388 while (1) {
389 int volume_changed;
2340bcd3
JVH
390
391 fd_set rfds;
392 struct timeval tv;
393 int nfds, ret;
394
395 MUTEX_LOCK(raop_rtp->run_mutex);
396 if (!raop_rtp->running) {
397 MUTEX_UNLOCK(raop_rtp->run_mutex);
398 break;
399 }
400 volume_changed = (volume != raop_rtp->volume);
401 volume = raop_rtp->volume;
402 MUTEX_UNLOCK(raop_rtp->run_mutex);
403
404 /* Call set_volume callback if changed */
405 if (volume_changed) {
406 raop_rtp->callbacks.audio_set_volume(raop_rtp->callbacks.cls, cb_data, volume);
407 }
408
409 /* Set timeout value to 5ms */
410 tv.tv_sec = 0;
411 tv.tv_usec = 5000;
412
413 /* Get the correct nfds value and set rfds */
414 FD_ZERO(&rfds);
415 if (stream_fd == -1) {
416 FD_SET(raop_rtp->dsock, &rfds);
417 nfds = raop_rtp->dsock+1;
418 } else {
419 FD_SET(stream_fd, &rfds);
420 nfds = stream_fd+1;
421 }
422 ret = select(nfds, &rfds, NULL, NULL, &tv);
423 if (ret == 0) {
424 /* Timeout happened */
425 continue;
426 } else if (ret == -1) {
427 /* FIXME: Error happened */
46212791 428 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in select");
2340bcd3
JVH
429 break;
430 }
431 if (stream_fd == -1 && FD_ISSET(raop_rtp->dsock, &rfds)) {
432 struct sockaddr_storage saddr;
433 socklen_t saddrlen;
434
46212791 435 logger_log(raop_rtp->logger, LOGGER_INFO, "Accepting client");
2340bcd3
JVH
436 saddrlen = sizeof(saddr);
437 stream_fd = accept(raop_rtp->dsock, (struct sockaddr *)&saddr, &saddrlen);
438 if (stream_fd == -1) {
439 /* FIXME: Error happened */
46212791 440 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in accept %d %s", errno, strerror(errno));
2340bcd3
JVH
441 break;
442 }
443 }
444 if (stream_fd != -1 && FD_ISSET(stream_fd, &rfds)) {
445 unsigned int rtplen=0;
2340bcd3
JVH
446
447 const void *audiobuf;
448 int audiobuflen;
449
450 ret = recv(stream_fd, (char *)(packet+packetlen), sizeof(packet)-packetlen, 0);
451 if (ret == 0) {
452 /* TCP socket closed */
46212791 453 logger_log(raop_rtp->logger, LOGGER_INFO, "TCP socket closed");
2340bcd3
JVH
454 break;
455 } else if (ret == -1) {
456 /* FIXME: Error happened */
46212791 457 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in recv");
2340bcd3
JVH
458 break;
459 }
460 packetlen += ret;
461
462 /* Check that we have enough bytes */
463 if (packetlen < 4) {
464 continue;
465 }
466 if (packet[0] != '$' || packet[1] != '\0') {
467 /* FIXME: Incorrect RTP magic bytes */
468 break;
469 }
470 rtplen = (packet[2] << 8) | packet[3];
471 if (rtplen > sizeof(packet)) {
472 /* FIXME: Too long packet */
46212791 473 logger_log(raop_rtp->logger, LOGGER_INFO, "Error, packet too long %d", rtplen);
2340bcd3
JVH
474 break;
475 }
476 if (packetlen < 4+rtplen) {
477 continue;
478 }
479
480 /* Packet is valid, process it */
2340bcd3
JVH
481 ret = raop_buffer_queue(raop_rtp->buffer, packet+4, rtplen, 0);
482 assert(ret >= 0);
483
484 /* Remove processed bytes from packet buffer */
485 memmove(packet, packet+4+rtplen, packetlen-rtplen);
486 packetlen -= 4+rtplen;
487
488 /* Decode the received frame */
489 if ((audiobuf = raop_buffer_dequeue(raop_rtp->buffer, &audiobuflen, 1))) {
490 raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, cb_data, audiobuf, audiobuflen);
491 }
492 }
493 }
494
495 /* Close the stream file descriptor */
496 if (stream_fd != -1) {
497 closesocket(stream_fd);
498 }
499
46212791 500 logger_log(raop_rtp->logger, LOGGER_INFO, "Exiting TCP RAOP thread");
2340bcd3
JVH
501 raop_rtp->callbacks.audio_destroy(raop_rtp->callbacks.cls, cb_data);
502
503 return 0;
504}
505
506void
507raop_rtp_start(raop_rtp_t *raop_rtp, int use_udp, unsigned short control_rport, unsigned short timing_rport,
508 unsigned short *control_lport, unsigned short *timing_lport, unsigned short *data_lport)
509{
ba0970e1
JVH
510 int use_ipv6 = 0;
511
2340bcd3
JVH
512 assert(raop_rtp);
513
514 MUTEX_LOCK(raop_rtp->run_mutex);
515 if (raop_rtp->running || !raop_rtp->joined) {
516 MUTEX_UNLOCK(raop_rtp->run_mutex);
517 return;
518 }
519
520 /* Initialize ports and sockets */
521 raop_rtp->control_rport = control_rport;
522 raop_rtp->timing_rport = timing_rport;
ba0970e1
JVH
523 if (raop_rtp->remote_saddr.ss_family == AF_INET6) {
524 use_ipv6 = 1;
525 }
526 if (raop_rtp_init_sockets(raop_rtp, use_ipv6, use_udp) < 0) {
46212791 527 logger_log(raop_rtp->logger, LOGGER_INFO, "Initializing sockets failed");
2340bcd3
JVH
528 MUTEX_UNLOCK(raop_rtp->run_mutex);
529 return;
530 }
531 if (control_lport) *control_lport = raop_rtp->control_lport;
532 if (timing_lport) *timing_lport = raop_rtp->timing_lport;
533 if (data_lport) *data_lport = raop_rtp->data_lport;
534
535 /* Create the thread and initialize running values */
536 raop_rtp->running = 1;
537 raop_rtp->joined = 0;
538 if (use_udp) {
539 THREAD_CREATE(raop_rtp->thread, raop_rtp_thread_udp, raop_rtp);
540 } else {
541 THREAD_CREATE(raop_rtp->thread, raop_rtp_thread_tcp, raop_rtp);
542 }
543 MUTEX_UNLOCK(raop_rtp->run_mutex);
544}
545
546void
547raop_rtp_set_volume(raop_rtp_t *raop_rtp, float volume)
548{
549 assert(raop_rtp);
550
551 if (volume > 0.0f) {
552 volume = 0.0f;
553 } else if (volume < -144.0f) {
554 volume = -144.0f;
555 }
556
557 /* Set volume in thread instead */
558 MUTEX_LOCK(raop_rtp->run_mutex);
559 raop_rtp->volume = volume;
560 MUTEX_UNLOCK(raop_rtp->run_mutex);
561}
562
563void
564raop_rtp_flush(raop_rtp_t *raop_rtp, int next_seq)
565{
566 assert(raop_rtp);
567
568 /* Call flush in thread instead */
569 MUTEX_LOCK(raop_rtp->run_mutex);
570 raop_rtp->flush = next_seq;
571 MUTEX_UNLOCK(raop_rtp->run_mutex);
572}
573
574void
575raop_rtp_stop(raop_rtp_t *raop_rtp)
576{
577 assert(raop_rtp);
578
579 /* Check that we are running and thread is not
580 * joined (should never be while still running) */
581 MUTEX_LOCK(raop_rtp->run_mutex);
582 if (!raop_rtp->running || raop_rtp->joined) {
583 MUTEX_UNLOCK(raop_rtp->run_mutex);
584 return;
585 }
586 raop_rtp->running = 0;
587 MUTEX_UNLOCK(raop_rtp->run_mutex);
588
589 /* Join the thread */
590 THREAD_JOIN(raop_rtp->thread);
591 if (raop_rtp->csock != -1) closesocket(raop_rtp->csock);
592 if (raop_rtp->tsock != -1) closesocket(raop_rtp->tsock);
593 if (raop_rtp->dsock != -1) closesocket(raop_rtp->dsock);
594
595 /* Flush buffer into initial state */
596 raop_buffer_flush(raop_rtp->buffer, -1);
597
598 /* Mark thread as joined */
599 MUTEX_LOCK(raop_rtp->run_mutex);
600 raop_rtp->joined = 1;
601 MUTEX_UNLOCK(raop_rtp->run_mutex);
602}