Imported Upstream version 0.9.0
[deb_shairplay.git] / src / lib / raop_rtp.c
CommitLineData
15c988f7
JB
1/**
2 * Copyright (C) 2011-2012 Juho Vähä-Herttua
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 */
14
15#include <stdlib.h>
16#include <stdio.h>
17#include <string.h>
18#include <assert.h>
19#include <errno.h>
20
21#include "raop_rtp.h"
22#include "raop.h"
23#include "raop_buffer.h"
24#include "netutils.h"
25#include "utils.h"
26#include "compat.h"
27#include "logger.h"
28
29#define NO_FLUSH (-42)
30
31struct raop_rtp_s {
32 logger_t *logger;
33 raop_callbacks_t callbacks;
34
35 /* Buffer to handle all resends */
36 raop_buffer_t *buffer;
37
38 /* Remote address as sockaddr */
39 struct sockaddr_storage remote_saddr;
40 socklen_t remote_saddr_len;
41
42 /* MUTEX LOCKED VARIABLES START */
43 /* These variables only edited mutex locked */
44 int running;
45 int joined;
46
47 float volume;
48 int volume_changed;
49 unsigned char *metadata;
50 int metadata_len;
51 unsigned char *coverart;
52 int coverart_len;
53
54 int flush;
55 thread_handle_t thread;
56 mutex_handle_t run_mutex;
57 /* MUTEX LOCKED VARIABLES END */
58
59 /* Remote control and timing ports */
60 unsigned short control_rport;
61 unsigned short timing_rport;
62
63 /* Sockets for control, timing and data */
64 int csock, tsock, dsock;
65
66 /* Local control, timing and data ports */
67 unsigned short control_lport;
68 unsigned short timing_lport;
69 unsigned short data_lport;
70
71 /* Initialized after the first control packet */
72 struct sockaddr_storage control_saddr;
73 socklen_t control_saddr_len;
74 unsigned short control_seqnum;
75};
76
77static int
78raop_rtp_parse_remote(raop_rtp_t *raop_rtp, const char *remote)
79{
80 char *original;
81 char *current;
82 char *tmpstr;
83 int family;
84 int ret;
85
86 assert(raop_rtp);
87
88 current = original = strdup(remote);
89 if (!original) {
90 return -1;
91 }
92 tmpstr = utils_strsep(&current, " ");
93 if (strcmp(tmpstr, "IN")) {
94 free(original);
95 return -1;
96 }
97 tmpstr = utils_strsep(&current, " ");
98 if (!strcmp(tmpstr, "IP4") && current) {
99 family = AF_INET;
100 } else if (!strcmp(tmpstr, "IP6") && current) {
101 family = AF_INET6;
102 } else {
103 free(original);
104 return -1;
105 }
106 if (strstr(current, ":")) {
107 /* FIXME: iTunes sends IP4 even with an IPv6 address, does it mean something */
108 family = AF_INET6;
109 }
110 ret = netutils_parse_address(family, current,
111 &raop_rtp->remote_saddr,
112 sizeof(raop_rtp->remote_saddr));
113 if (ret < 0) {
114 free(original);
115 return -1;
116 }
117 raop_rtp->remote_saddr_len = ret;
118 free(original);
119 return 0;
120}
121
122raop_rtp_t *
123raop_rtp_init(logger_t *logger, raop_callbacks_t *callbacks, const char *remote,
124 const char *rtpmap, const char *fmtp,
125 const unsigned char *aeskey, const unsigned char *aesiv)
126{
127 raop_rtp_t *raop_rtp;
128
129 assert(logger);
130 assert(callbacks);
131 assert(remote);
132 assert(rtpmap);
133 assert(fmtp);
134
135 raop_rtp = calloc(1, sizeof(raop_rtp_t));
136 if (!raop_rtp) {
137 return NULL;
138 }
139 raop_rtp->logger = logger;
140 memcpy(&raop_rtp->callbacks, callbacks, sizeof(raop_callbacks_t));
141 raop_rtp->buffer = raop_buffer_init(rtpmap, fmtp, aeskey, aesiv);
142 if (!raop_rtp->buffer) {
143 free(raop_rtp);
144 return NULL;
145 }
146 if (raop_rtp_parse_remote(raop_rtp, remote) < 0) {
147 free(raop_rtp);
148 return NULL;
149 }
150
151 raop_rtp->running = 0;
152 raop_rtp->joined = 1;
153 raop_rtp->flush = NO_FLUSH;
154 MUTEX_CREATE(raop_rtp->run_mutex);
155
156 return raop_rtp;
157}
158
159void
160raop_rtp_destroy(raop_rtp_t *raop_rtp)
161{
162 if (raop_rtp) {
163 raop_rtp_stop(raop_rtp);
164
165 MUTEX_DESTROY(raop_rtp->run_mutex);
166 raop_buffer_destroy(raop_rtp->buffer);
167 free(raop_rtp->metadata);
168 free(raop_rtp->coverart);
169 free(raop_rtp);
170 }
171}
172
173static int
174raop_rtp_init_sockets(raop_rtp_t *raop_rtp, int use_ipv6, int use_udp)
175{
176 int csock = -1, tsock = -1, dsock = -1;
177 unsigned short cport = 0, tport = 0, dport = 0;
178
179 assert(raop_rtp);
180
181 if (use_udp) {
182 csock = netutils_init_socket(&cport, use_ipv6, use_udp);
183 tsock = netutils_init_socket(&tport, use_ipv6, use_udp);
184 if (csock == -1 || tsock == -1) {
185 goto sockets_cleanup;
186 }
187 }
188 dsock = netutils_init_socket(&dport, use_ipv6, use_udp);
189 if (dsock == -1) {
190 goto sockets_cleanup;
191 }
192
193 /* Listen to the data socket if using TCP */
194 if (!use_udp) {
195 if (listen(dsock, 1) < 0)
196 goto sockets_cleanup;
197 }
198
199 /* Set socket descriptors */
200 raop_rtp->csock = csock;
201 raop_rtp->tsock = tsock;
202 raop_rtp->dsock = dsock;
203
204 /* Set port values */
205 raop_rtp->control_lport = cport;
206 raop_rtp->timing_lport = tport;
207 raop_rtp->data_lport = dport;
208 return 0;
209
210sockets_cleanup:
211 if (csock != -1) closesocket(csock);
212 if (tsock != -1) closesocket(tsock);
213 if (dsock != -1) closesocket(dsock);
214 return -1;
215}
216
217static int
218raop_rtp_resend_callback(void *opaque, unsigned short seqnum, unsigned short count)
219{
220 raop_rtp_t *raop_rtp = opaque;
221 unsigned char packet[8];
222 unsigned short ourseqnum;
223 struct sockaddr *addr;
224 socklen_t addrlen;
225 int ret;
226
227 addr = (struct sockaddr *)&raop_rtp->control_saddr;
228 addrlen = raop_rtp->control_saddr_len;
229
230 logger_log(raop_rtp->logger, LOGGER_DEBUG, "Got resend request %d %d", seqnum, count);
231 ourseqnum = raop_rtp->control_seqnum++;
232
233 /* Fill the request buffer */
234 packet[0] = 0x80;
235 packet[1] = 0x55|0x80;
236 packet[2] = (ourseqnum >> 8);
237 packet[3] = ourseqnum;
238 packet[4] = (seqnum >> 8);
239 packet[5] = seqnum;
240 packet[6] = (count >> 8);
241 packet[7] = count;
242
243 ret = sendto(raop_rtp->csock, (const char *)packet, sizeof(packet), 0, addr, addrlen);
244 if (ret == -1) {
245 logger_log(raop_rtp->logger, LOGGER_WARNING, "Resend failed: %d", SOCKET_GET_ERROR());
246 }
247
248 return 0;
249}
250
251static int
252raop_rtp_process_events(raop_rtp_t *raop_rtp, void *cb_data)
253{
254 int flush;
255 float volume;
256 int volume_changed;
257 unsigned char *metadata;
258 int metadata_len;
259 unsigned char *coverart;
260 int coverart_len;
261
262 assert(raop_rtp);
263
264 MUTEX_LOCK(raop_rtp->run_mutex);
265 if (!raop_rtp->running) {
266 MUTEX_UNLOCK(raop_rtp->run_mutex);
267 return 1;
268 }
269
270 /* Read the volume level */
271 volume = raop_rtp->volume;
272 volume_changed = raop_rtp->volume_changed;
273 raop_rtp->volume_changed = 0;
274
275 /* Read the flush value */
276 flush = raop_rtp->flush;
277 raop_rtp->flush = NO_FLUSH;
278
279 /* Read the metadata */
280 metadata = raop_rtp->metadata;
281 metadata_len = raop_rtp->metadata_len;
282 raop_rtp->metadata = NULL;
283 raop_rtp->metadata_len = 0;
284
285 /* Read the coverart */
286 coverart = raop_rtp->coverart;
287 coverart_len = raop_rtp->coverart_len;
288 raop_rtp->coverart = NULL;
289 raop_rtp->coverart_len = 0;
290 MUTEX_UNLOCK(raop_rtp->run_mutex);
291
292 /* Call set_volume callback if changed */
293 if (volume_changed) {
294 if (raop_rtp->callbacks.audio_set_volume) {
295 raop_rtp->callbacks.audio_set_volume(raop_rtp->callbacks.cls, cb_data, volume);
296 }
297 }
298
299 /* Handle flush if requested */
300 if (flush != NO_FLUSH) {
301 raop_buffer_flush(raop_rtp->buffer, flush);
302 if (raop_rtp->callbacks.audio_flush) {
303 raop_rtp->callbacks.audio_flush(raop_rtp->callbacks.cls, cb_data);
304 }
305 }
306 if (metadata != NULL) {
307 if (raop_rtp->callbacks.audio_set_metadata) {
308 raop_rtp->callbacks.audio_set_metadata(raop_rtp->callbacks.cls, cb_data, metadata, metadata_len);
309 }
310 free(metadata);
311 metadata = NULL;
312 }
313 if (coverart != NULL) {
314 if (raop_rtp->callbacks.audio_set_coverart) {
315 raop_rtp->callbacks.audio_set_coverart(raop_rtp->callbacks.cls, cb_data, coverart, coverart_len);
316 }
317 free(coverart);
318 coverart = NULL;
319 }
320 return 0;
321}
322
323static THREAD_RETVAL
324raop_rtp_thread_udp(void *arg)
325{
326 raop_rtp_t *raop_rtp = arg;
327 unsigned char packet[RAOP_PACKET_LEN];
328 unsigned int packetlen;
329 struct sockaddr_storage saddr;
330 socklen_t saddrlen;
331
332 const ALACSpecificConfig *config;
333 void *cb_data = NULL;
334
335 assert(raop_rtp);
336
337 config = raop_buffer_get_config(raop_rtp->buffer);
338 cb_data = raop_rtp->callbacks.audio_init(raop_rtp->callbacks.cls,
339 config->bitDepth,
340 config->numChannels,
341 config->sampleRate);
342
343 while(1) {
344 fd_set rfds;
345 struct timeval tv;
346 int nfds, ret;
347
348 /* Check if we are still running and process callbacks */
349 if (raop_rtp_process_events(raop_rtp, cb_data)) {
350 break;
351 }
352
353 /* Set timeout value to 5ms */
354 tv.tv_sec = 0;
355 tv.tv_usec = 5000;
356
357 /* Get the correct nfds value */
358 nfds = raop_rtp->csock+1;
359 if (raop_rtp->tsock >= nfds)
360 nfds = raop_rtp->tsock+1;
361 if (raop_rtp->dsock >= nfds)
362 nfds = raop_rtp->dsock+1;
363
364 /* Set rfds and call select */
365 FD_ZERO(&rfds);
366 FD_SET(raop_rtp->csock, &rfds);
367 FD_SET(raop_rtp->tsock, &rfds);
368 FD_SET(raop_rtp->dsock, &rfds);
369 ret = select(nfds, &rfds, NULL, NULL, &tv);
370 if (ret == 0) {
371 /* Timeout happened */
372 continue;
373 } else if (ret == -1) {
374 /* FIXME: Error happened */
375 break;
376 }
377
378 if (FD_ISSET(raop_rtp->csock, &rfds)) {
379 saddrlen = sizeof(saddr);
380 packetlen = recvfrom(raop_rtp->csock, (char *)packet, sizeof(packet), 0,
381 (struct sockaddr *)&saddr, &saddrlen);
382
383 /* Get the destination address here, because we need the sin6_scope_id */
384 memcpy(&raop_rtp->control_saddr, &saddr, saddrlen);
385 raop_rtp->control_saddr_len = saddrlen;
386
387 if (packetlen >= 12) {
388 char type = packet[1] & ~0x80;
389
390 logger_log(raop_rtp->logger, LOGGER_DEBUG, "Got control packet of type 0x%02x", type);
391 if (type == 0x56) {
392 /* Handle resent data packet */
393 int ret = raop_buffer_queue(raop_rtp->buffer, packet+4, packetlen-4, 1);
394 assert(ret >= 0);
395 }
396 }
397 } else if (FD_ISSET(raop_rtp->tsock, &rfds)) {
398 logger_log(raop_rtp->logger, LOGGER_INFO, "Would have timing packet in queue");
399 } else if (FD_ISSET(raop_rtp->dsock, &rfds)) {
400 saddrlen = sizeof(saddr);
401 packetlen = recvfrom(raop_rtp->dsock, (char *)packet, sizeof(packet), 0,
402 (struct sockaddr *)&saddr, &saddrlen);
403 if (packetlen >= 12) {
404 int no_resend = (raop_rtp->control_rport == 0);
405 int ret;
406
407 const void *audiobuf;
408 int audiobuflen;
409
410 ret = raop_buffer_queue(raop_rtp->buffer, packet, packetlen, 1);
411 assert(ret >= 0);
412
413 /* Decode all frames in queue */
414 while ((audiobuf = raop_buffer_dequeue(raop_rtp->buffer, &audiobuflen, no_resend))) {
415 raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, cb_data, audiobuf, audiobuflen);
416 }
417
418 /* Handle possible resend requests */
419 if (!no_resend) {
420 raop_buffer_handle_resends(raop_rtp->buffer, raop_rtp_resend_callback, raop_rtp);
421 }
422 }
423 }
424 }
425 logger_log(raop_rtp->logger, LOGGER_INFO, "Exiting UDP RAOP thread");
426 raop_rtp->callbacks.audio_destroy(raop_rtp->callbacks.cls, cb_data);
427
428 return 0;
429}
430
431static THREAD_RETVAL
432raop_rtp_thread_tcp(void *arg)
433{
434 raop_rtp_t *raop_rtp = arg;
435 int stream_fd = -1;
436 unsigned char packet[RAOP_PACKET_LEN];
437 unsigned int packetlen = 0;
438
439 const ALACSpecificConfig *config;
440 void *cb_data = NULL;
441
442 assert(raop_rtp);
443
444 config = raop_buffer_get_config(raop_rtp->buffer);
445 cb_data = raop_rtp->callbacks.audio_init(raop_rtp->callbacks.cls,
446 config->bitDepth,
447 config->numChannels,
448 config->sampleRate);
449
450 while (1) {
451 fd_set rfds;
452 struct timeval tv;
453 int nfds, ret;
454
455 /* Check if we are still running and process callbacks */
456 if (raop_rtp_process_events(raop_rtp, cb_data)) {
457 break;
458 }
459
460 /* Set timeout value to 5ms */
461 tv.tv_sec = 0;
462 tv.tv_usec = 5000;
463
464 /* Get the correct nfds value and set rfds */
465 FD_ZERO(&rfds);
466 if (stream_fd == -1) {
467 FD_SET(raop_rtp->dsock, &rfds);
468 nfds = raop_rtp->dsock+1;
469 } else {
470 FD_SET(stream_fd, &rfds);
471 nfds = stream_fd+1;
472 }
473 ret = select(nfds, &rfds, NULL, NULL, &tv);
474 if (ret == 0) {
475 /* Timeout happened */
476 continue;
477 } else if (ret == -1) {
478 /* FIXME: Error happened */
479 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in select");
480 break;
481 }
482 if (stream_fd == -1 && FD_ISSET(raop_rtp->dsock, &rfds)) {
483 struct sockaddr_storage saddr;
484 socklen_t saddrlen;
485
486 logger_log(raop_rtp->logger, LOGGER_INFO, "Accepting client");
487 saddrlen = sizeof(saddr);
488 stream_fd = accept(raop_rtp->dsock, (struct sockaddr *)&saddr, &saddrlen);
489 if (stream_fd == -1) {
490 /* FIXME: Error happened */
491 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in accept %d %s", errno, strerror(errno));
492 break;
493 }
494 }
495 if (stream_fd != -1 && FD_ISSET(stream_fd, &rfds)) {
496 unsigned int rtplen=0;
497
498 const void *audiobuf;
499 int audiobuflen;
500
501 ret = recv(stream_fd, (char *)(packet+packetlen), sizeof(packet)-packetlen, 0);
502 if (ret == 0) {
503 /* TCP socket closed */
504 logger_log(raop_rtp->logger, LOGGER_INFO, "TCP socket closed");
505 break;
506 } else if (ret == -1) {
507 /* FIXME: Error happened */
508 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in recv");
509 break;
510 }
511 packetlen += ret;
512
513 /* Check that we have enough bytes */
514 if (packetlen < 4) {
515 continue;
516 }
517 if (packet[0] != '$' || packet[1] != '\0') {
518 /* FIXME: Incorrect RTP magic bytes */
519 break;
520 }
521 rtplen = (packet[2] << 8) | packet[3];
522 if (rtplen > sizeof(packet)) {
523 /* FIXME: Too long packet */
524 logger_log(raop_rtp->logger, LOGGER_INFO, "Error, packet too long %d", rtplen);
525 break;
526 }
527 if (packetlen < 4+rtplen) {
528 continue;
529 }
530
531 /* Packet is valid, process it */
532 ret = raop_buffer_queue(raop_rtp->buffer, packet+4, rtplen, 0);
533 assert(ret >= 0);
534
535 /* Remove processed bytes from packet buffer */
536 memmove(packet, packet+4+rtplen, packetlen-rtplen);
537 packetlen -= 4+rtplen;
538
539 /* Decode the received frame */
540 if ((audiobuf = raop_buffer_dequeue(raop_rtp->buffer, &audiobuflen, 1))) {
541 raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, cb_data, audiobuf, audiobuflen);
542 }
543 }
544 }
545
546 /* Close the stream file descriptor */
547 if (stream_fd != -1) {
548 closesocket(stream_fd);
549 }
550
551 logger_log(raop_rtp->logger, LOGGER_INFO, "Exiting TCP RAOP thread");
552 raop_rtp->callbacks.audio_destroy(raop_rtp->callbacks.cls, cb_data);
553
554 return 0;
555}
556
557void
558raop_rtp_start(raop_rtp_t *raop_rtp, int use_udp, unsigned short control_rport, unsigned short timing_rport,
559 unsigned short *control_lport, unsigned short *timing_lport, unsigned short *data_lport)
560{
561 int use_ipv6 = 0;
562
563 assert(raop_rtp);
564
565 MUTEX_LOCK(raop_rtp->run_mutex);
566 if (raop_rtp->running || !raop_rtp->joined) {
567 MUTEX_UNLOCK(raop_rtp->run_mutex);
568 return;
569 }
570
571 /* Initialize ports and sockets */
572 raop_rtp->control_rport = control_rport;
573 raop_rtp->timing_rport = timing_rport;
574 if (raop_rtp->remote_saddr.ss_family == AF_INET6) {
575 use_ipv6 = 1;
576 }
577 if (raop_rtp_init_sockets(raop_rtp, use_ipv6, use_udp) < 0) {
578 logger_log(raop_rtp->logger, LOGGER_INFO, "Initializing sockets failed");
579 MUTEX_UNLOCK(raop_rtp->run_mutex);
580 return;
581 }
582 if (control_lport) *control_lport = raop_rtp->control_lport;
583 if (timing_lport) *timing_lport = raop_rtp->timing_lport;
584 if (data_lport) *data_lport = raop_rtp->data_lport;
585
586 /* Create the thread and initialize running values */
587 raop_rtp->running = 1;
588 raop_rtp->joined = 0;
589 if (use_udp) {
590 THREAD_CREATE(raop_rtp->thread, raop_rtp_thread_udp, raop_rtp);
591 } else {
592 THREAD_CREATE(raop_rtp->thread, raop_rtp_thread_tcp, raop_rtp);
593 }
594 MUTEX_UNLOCK(raop_rtp->run_mutex);
595}
596
597void
598raop_rtp_set_volume(raop_rtp_t *raop_rtp, float volume)
599{
600 assert(raop_rtp);
601
602 if (volume > 0.0f) {
603 volume = 0.0f;
604 } else if (volume < -144.0f) {
605 volume = -144.0f;
606 }
607
608 /* Set volume in thread instead */
609 MUTEX_LOCK(raop_rtp->run_mutex);
610 raop_rtp->volume = volume;
611 raop_rtp->volume_changed = 1;
612 MUTEX_UNLOCK(raop_rtp->run_mutex);
613}
614
615void
616raop_rtp_set_metadata(raop_rtp_t *raop_rtp, const char *data, int datalen)
617{
618 unsigned char *metadata;
619
620 assert(raop_rtp);
621
622 if (datalen <= 0) {
623 return;
624 }
625 metadata = malloc(datalen);
626 assert(metadata);
627 memcpy(metadata, data, datalen);
628
629 /* Set metadata in thread instead */
630 MUTEX_LOCK(raop_rtp->run_mutex);
631 raop_rtp->metadata = metadata;
632 raop_rtp->metadata_len = datalen;
633 MUTEX_UNLOCK(raop_rtp->run_mutex);
634}
635
636void
637raop_rtp_set_coverart(raop_rtp_t *raop_rtp, const char *data, int datalen)
638{
639 unsigned char *coverart;
640
641 assert(raop_rtp);
642
643 if (datalen <= 0) {
644 return;
645 }
646 coverart = malloc(datalen);
647 assert(coverart);
648 memcpy(coverart, data, datalen);
649
650 /* Set coverart in thread instead */
651 MUTEX_LOCK(raop_rtp->run_mutex);
652 raop_rtp->coverart = coverart;
653 raop_rtp->coverart_len = datalen;
654 MUTEX_UNLOCK(raop_rtp->run_mutex);
655}
656
657void
658raop_rtp_flush(raop_rtp_t *raop_rtp, int next_seq)
659{
660 assert(raop_rtp);
661
662 /* Call flush in thread instead */
663 MUTEX_LOCK(raop_rtp->run_mutex);
664 raop_rtp->flush = next_seq;
665 MUTEX_UNLOCK(raop_rtp->run_mutex);
666}
667
668void
669raop_rtp_stop(raop_rtp_t *raop_rtp)
670{
671 assert(raop_rtp);
672
673 /* Check that we are running and thread is not
674 * joined (should never be while still running) */
675 MUTEX_LOCK(raop_rtp->run_mutex);
676 if (!raop_rtp->running || raop_rtp->joined) {
677 MUTEX_UNLOCK(raop_rtp->run_mutex);
678 return;
679 }
680 raop_rtp->running = 0;
681 MUTEX_UNLOCK(raop_rtp->run_mutex);
682
683 /* Join the thread */
684 THREAD_JOIN(raop_rtp->thread);
685 if (raop_rtp->csock != -1) closesocket(raop_rtp->csock);
686 if (raop_rtp->tsock != -1) closesocket(raop_rtp->tsock);
687 if (raop_rtp->dsock != -1) closesocket(raop_rtp->dsock);
688
689 /* Flush buffer into initial state */
690 raop_buffer_flush(raop_rtp->buffer, -1);
691
692 /* Mark thread as joined */
693 MUTEX_LOCK(raop_rtp->run_mutex);
694 raop_rtp->joined = 1;
695 MUTEX_UNLOCK(raop_rtp->run_mutex);
696}