Handle volume change with an explicit flag
[deb_shairplay.git] / src / lib / raop_rtp.c
1 /**
2 * Copyright (C) 2011-2012 Juho Vähä-Herttua
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 */
14
15 #include <stdlib.h>
16 #include <stdio.h>
17 #include <string.h>
18 #include <assert.h>
19 #include <errno.h>
20
21 #include "raop_rtp.h"
22 #include "raop.h"
23 #include "raop_buffer.h"
24 #include "netutils.h"
25 #include "utils.h"
26 #include "compat.h"
27 #include "logger.h"
28
29 #define NO_FLUSH (-42)
30
31 struct raop_rtp_s {
32 logger_t *logger;
33 raop_callbacks_t callbacks;
34
35 /* Buffer to handle all resends */
36 raop_buffer_t *buffer;
37
38 /* Remote address as sockaddr */
39 struct sockaddr_storage remote_saddr;
40 socklen_t remote_saddr_len;
41
42 /* MUTEX LOCKED VARIABLES START */
43 /* These variables only edited mutex locked */
44 int running;
45 int joined;
46
47 float volume;
48 int volume_changed;
49 unsigned char *metadata;
50 int metadata_len;
51 unsigned char *coverart;
52 int coverart_len;
53
54 int flush;
55 thread_handle_t thread;
56 mutex_handle_t run_mutex;
57 /* MUTEX LOCKED VARIABLES END */
58
59 /* Remote control and timing ports */
60 unsigned short control_rport;
61 unsigned short timing_rport;
62
63 /* Sockets for control, timing and data */
64 int csock, tsock, dsock;
65
66 /* Local control, timing and data ports */
67 unsigned short control_lport;
68 unsigned short timing_lport;
69 unsigned short data_lport;
70
71 /* Initialized after the first control packet */
72 struct sockaddr_storage control_saddr;
73 socklen_t control_saddr_len;
74 unsigned short control_seqnum;
75 };
76
77 static int
78 raop_rtp_parse_remote(raop_rtp_t *raop_rtp, const char *remote)
79 {
80 char *original;
81 char *current;
82 char *tmpstr;
83 int family;
84 int ret;
85
86 assert(raop_rtp);
87
88 current = original = strdup(remote);
89 if (!original) {
90 return -1;
91 }
92 tmpstr = utils_strsep(&current, " ");
93 if (strcmp(tmpstr, "IN")) {
94 free(original);
95 return -1;
96 }
97 tmpstr = utils_strsep(&current, " ");
98 if (!strcmp(tmpstr, "IP4") && current) {
99 family = AF_INET;
100 } else if (!strcmp(tmpstr, "IP6") && current) {
101 family = AF_INET6;
102 } else {
103 free(original);
104 return -1;
105 }
106 if (strstr(current, ":")) {
107 /* FIXME: iTunes sends IP4 even with an IPv6 address, does it mean something */
108 family = AF_INET6;
109 }
110 ret = netutils_parse_address(family, current,
111 &raop_rtp->remote_saddr,
112 sizeof(raop_rtp->remote_saddr));
113 if (ret < 0) {
114 free(original);
115 return -1;
116 }
117 raop_rtp->remote_saddr_len = ret;
118 free(original);
119 return 0;
120 }
121
122 raop_rtp_t *
123 raop_rtp_init(logger_t *logger, raop_callbacks_t *callbacks, const char *remote,
124 const char *fmtp, const unsigned char *aeskey, const unsigned char *aesiv)
125 {
126 raop_rtp_t *raop_rtp;
127
128 assert(logger);
129 assert(callbacks);
130 assert(remote);
131 assert(fmtp);
132
133 raop_rtp = calloc(1, sizeof(raop_rtp_t));
134 if (!raop_rtp) {
135 return NULL;
136 }
137 raop_rtp->logger = logger;
138 memcpy(&raop_rtp->callbacks, callbacks, sizeof(raop_callbacks_t));
139 raop_rtp->buffer = raop_buffer_init(fmtp, aeskey, aesiv);
140 if (!raop_rtp->buffer) {
141 free(raop_rtp);
142 return NULL;
143 }
144 if (raop_rtp_parse_remote(raop_rtp, remote) < 0) {
145 free(raop_rtp);
146 return NULL;
147 }
148
149 raop_rtp->running = 0;
150 raop_rtp->joined = 1;
151 raop_rtp->flush = NO_FLUSH;
152 MUTEX_CREATE(raop_rtp->run_mutex);
153
154 return raop_rtp;
155 }
156
157 void
158 raop_rtp_destroy(raop_rtp_t *raop_rtp)
159 {
160 if (raop_rtp) {
161 raop_rtp_stop(raop_rtp);
162
163 MUTEX_DESTROY(raop_rtp->run_mutex);
164 raop_buffer_destroy(raop_rtp->buffer);
165 free(raop_rtp->metadata);
166 free(raop_rtp->coverart);
167 free(raop_rtp);
168 }
169 }
170
171 static int
172 raop_rtp_init_sockets(raop_rtp_t *raop_rtp, int use_ipv6, int use_udp)
173 {
174 int csock = -1, tsock = -1, dsock = -1;
175 unsigned short cport = 0, tport = 0, dport = 0;
176
177 assert(raop_rtp);
178
179 if (use_udp) {
180 csock = netutils_init_socket(&cport, use_ipv6, use_udp);
181 tsock = netutils_init_socket(&tport, use_ipv6, use_udp);
182 if (csock == -1 || tsock == -1) {
183 goto sockets_cleanup;
184 }
185 }
186 dsock = netutils_init_socket(&dport, use_ipv6, use_udp);
187 if (dsock == -1) {
188 goto sockets_cleanup;
189 }
190
191 /* Listen to the data socket if using TCP */
192 if (!use_udp) {
193 if (listen(dsock, 1) < 0)
194 goto sockets_cleanup;
195 }
196
197 /* Set socket descriptors */
198 raop_rtp->csock = csock;
199 raop_rtp->tsock = tsock;
200 raop_rtp->dsock = dsock;
201
202 /* Set port values */
203 raop_rtp->control_lport = cport;
204 raop_rtp->timing_lport = tport;
205 raop_rtp->data_lport = dport;
206 return 0;
207
208 sockets_cleanup:
209 if (csock != -1) closesocket(csock);
210 if (tsock != -1) closesocket(tsock);
211 if (dsock != -1) closesocket(dsock);
212 return -1;
213 }
214
215 static int
216 raop_rtp_resend_callback(void *opaque, unsigned short seqnum, unsigned short count)
217 {
218 raop_rtp_t *raop_rtp = opaque;
219 unsigned char packet[8];
220 unsigned short ourseqnum;
221 struct sockaddr *addr;
222 socklen_t addrlen;
223 int ret;
224
225 addr = (struct sockaddr *)&raop_rtp->control_saddr;
226 addrlen = raop_rtp->control_saddr_len;
227
228 logger_log(raop_rtp->logger, LOGGER_DEBUG, "Got resend request %d %d", seqnum, count);
229 ourseqnum = raop_rtp->control_seqnum++;
230
231 /* Fill the request buffer */
232 packet[0] = 0x80;
233 packet[1] = 0x55|0x80;
234 packet[2] = (ourseqnum >> 8);
235 packet[3] = ourseqnum;
236 packet[4] = (seqnum >> 8);
237 packet[5] = seqnum;
238 packet[6] = (count >> 8);
239 packet[7] = count;
240
241 ret = sendto(raop_rtp->csock, (const char *)packet, sizeof(packet), 0, addr, addrlen);
242 if (ret == -1) {
243 logger_log(raop_rtp->logger, LOGGER_WARNING, "Resend failed: %d", SOCKET_GET_ERROR());
244 }
245
246 return 0;
247 }
248
249 static int
250 raop_rtp_process_events(raop_rtp_t *raop_rtp, void *cb_data)
251 {
252 int flush;
253 float volume;
254 int volume_changed;
255 unsigned char *metadata;
256 int metadata_len;
257 unsigned char *coverart;
258 int coverart_len;
259
260 assert(raop_rtp);
261
262 MUTEX_LOCK(raop_rtp->run_mutex);
263 if (!raop_rtp->running) {
264 MUTEX_UNLOCK(raop_rtp->run_mutex);
265 return 1;
266 }
267
268 /* Read the volume level */
269 volume = raop_rtp->volume;
270 volume_changed = raop_rtp->volume_changed;
271 raop_rtp->volume_changed = 0;
272
273 /* Read the flush value */
274 flush = raop_rtp->flush;
275 raop_rtp->flush = NO_FLUSH;
276
277 /* Read the metadata */
278 metadata = raop_rtp->metadata;
279 metadata_len = raop_rtp->metadata_len;
280 raop_rtp->metadata = NULL;
281 raop_rtp->metadata_len = 0;
282
283 /* Read the coverart */
284 coverart = raop_rtp->coverart;
285 coverart_len = raop_rtp->coverart_len;
286 raop_rtp->coverart = NULL;
287 raop_rtp->coverart_len = 0;
288 MUTEX_UNLOCK(raop_rtp->run_mutex);
289
290 /* Call set_volume callback if changed */
291 if (volume_changed) {
292 if (raop_rtp->callbacks.audio_set_volume) {
293 raop_rtp->callbacks.audio_set_volume(raop_rtp->callbacks.cls, cb_data, volume);
294 }
295 }
296
297 /* Handle flush if requested */
298 if (flush != NO_FLUSH) {
299 raop_buffer_flush(raop_rtp->buffer, flush);
300 if (raop_rtp->callbacks.audio_flush) {
301 raop_rtp->callbacks.audio_flush(raop_rtp->callbacks.cls, cb_data);
302 }
303 }
304 if (metadata != NULL) {
305 if (raop_rtp->callbacks.audio_set_metadata) {
306 raop_rtp->callbacks.audio_set_metadata(raop_rtp->callbacks.cls, cb_data, metadata, metadata_len);
307 }
308 free(metadata);
309 metadata = NULL;
310 }
311 if (coverart != NULL) {
312 if (raop_rtp->callbacks.audio_set_coverart) {
313 raop_rtp->callbacks.audio_set_coverart(raop_rtp->callbacks.cls, cb_data, coverart, coverart_len);
314 }
315 free(coverart);
316 coverart = NULL;
317 }
318 return 0;
319 }
320
321 static THREAD_RETVAL
322 raop_rtp_thread_udp(void *arg)
323 {
324 raop_rtp_t *raop_rtp = arg;
325 unsigned char packet[RAOP_PACKET_LEN];
326 unsigned int packetlen;
327 struct sockaddr_storage saddr;
328 socklen_t saddrlen;
329
330 const ALACSpecificConfig *config;
331 void *cb_data = NULL;
332
333 assert(raop_rtp);
334
335 config = raop_buffer_get_config(raop_rtp->buffer);
336 cb_data = raop_rtp->callbacks.audio_init(raop_rtp->callbacks.cls,
337 config->bitDepth,
338 config->numChannels,
339 config->sampleRate);
340
341 while(1) {
342 fd_set rfds;
343 struct timeval tv;
344 int nfds, ret;
345
346 /* Check if we are still running and process callbacks */
347 if (raop_rtp_process_events(raop_rtp, cb_data)) {
348 break;
349 }
350
351 /* Set timeout value to 5ms */
352 tv.tv_sec = 0;
353 tv.tv_usec = 5000;
354
355 /* Get the correct nfds value */
356 nfds = raop_rtp->csock+1;
357 if (raop_rtp->tsock >= nfds)
358 nfds = raop_rtp->tsock+1;
359 if (raop_rtp->dsock >= nfds)
360 nfds = raop_rtp->dsock+1;
361
362 /* Set rfds and call select */
363 FD_ZERO(&rfds);
364 FD_SET(raop_rtp->csock, &rfds);
365 FD_SET(raop_rtp->tsock, &rfds);
366 FD_SET(raop_rtp->dsock, &rfds);
367 ret = select(nfds, &rfds, NULL, NULL, &tv);
368 if (ret == 0) {
369 /* Timeout happened */
370 continue;
371 } else if (ret == -1) {
372 /* FIXME: Error happened */
373 break;
374 }
375
376 if (FD_ISSET(raop_rtp->csock, &rfds)) {
377 saddrlen = sizeof(saddr);
378 packetlen = recvfrom(raop_rtp->csock, (char *)packet, sizeof(packet), 0,
379 (struct sockaddr *)&saddr, &saddrlen);
380
381 /* Get the destination address here, because we need the sin6_scope_id */
382 memcpy(&raop_rtp->control_saddr, &saddr, saddrlen);
383 raop_rtp->control_saddr_len = saddrlen;
384
385 if (packetlen >= 12) {
386 char type = packet[1] & ~0x80;
387
388 logger_log(raop_rtp->logger, LOGGER_DEBUG, "Got control packet of type 0x%02x", type);
389 if (type == 0x56) {
390 /* Handle resent data packet */
391 int ret = raop_buffer_queue(raop_rtp->buffer, packet+4, packetlen-4, 1);
392 assert(ret >= 0);
393 }
394 }
395 } else if (FD_ISSET(raop_rtp->tsock, &rfds)) {
396 logger_log(raop_rtp->logger, LOGGER_INFO, "Would have timing packet in queue");
397 } else if (FD_ISSET(raop_rtp->dsock, &rfds)) {
398 saddrlen = sizeof(saddr);
399 packetlen = recvfrom(raop_rtp->dsock, (char *)packet, sizeof(packet), 0,
400 (struct sockaddr *)&saddr, &saddrlen);
401 if (packetlen >= 12) {
402 int no_resend = (raop_rtp->control_rport == 0);
403 int ret;
404
405 const void *audiobuf;
406 int audiobuflen;
407
408 ret = raop_buffer_queue(raop_rtp->buffer, packet, packetlen, 1);
409 assert(ret >= 0);
410
411 /* Decode all frames in queue */
412 while ((audiobuf = raop_buffer_dequeue(raop_rtp->buffer, &audiobuflen, no_resend))) {
413 raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, cb_data, audiobuf, audiobuflen);
414 }
415
416 /* Handle possible resend requests */
417 if (!no_resend) {
418 raop_buffer_handle_resends(raop_rtp->buffer, raop_rtp_resend_callback, raop_rtp);
419 }
420 }
421 }
422 }
423 logger_log(raop_rtp->logger, LOGGER_INFO, "Exiting UDP RAOP thread");
424 raop_rtp->callbacks.audio_destroy(raop_rtp->callbacks.cls, cb_data);
425
426 return 0;
427 }
428
429 static THREAD_RETVAL
430 raop_rtp_thread_tcp(void *arg)
431 {
432 raop_rtp_t *raop_rtp = arg;
433 int stream_fd = -1;
434 unsigned char packet[RAOP_PACKET_LEN];
435 unsigned int packetlen = 0;
436
437 const ALACSpecificConfig *config;
438 void *cb_data = NULL;
439
440 assert(raop_rtp);
441
442 config = raop_buffer_get_config(raop_rtp->buffer);
443 cb_data = raop_rtp->callbacks.audio_init(raop_rtp->callbacks.cls,
444 config->bitDepth,
445 config->numChannels,
446 config->sampleRate);
447
448 while (1) {
449 fd_set rfds;
450 struct timeval tv;
451 int nfds, ret;
452
453 /* Check if we are still running and process callbacks */
454 if (raop_rtp_process_events(raop_rtp, cb_data)) {
455 break;
456 }
457
458 /* Set timeout value to 5ms */
459 tv.tv_sec = 0;
460 tv.tv_usec = 5000;
461
462 /* Get the correct nfds value and set rfds */
463 FD_ZERO(&rfds);
464 if (stream_fd == -1) {
465 FD_SET(raop_rtp->dsock, &rfds);
466 nfds = raop_rtp->dsock+1;
467 } else {
468 FD_SET(stream_fd, &rfds);
469 nfds = stream_fd+1;
470 }
471 ret = select(nfds, &rfds, NULL, NULL, &tv);
472 if (ret == 0) {
473 /* Timeout happened */
474 continue;
475 } else if (ret == -1) {
476 /* FIXME: Error happened */
477 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in select");
478 break;
479 }
480 if (stream_fd == -1 && FD_ISSET(raop_rtp->dsock, &rfds)) {
481 struct sockaddr_storage saddr;
482 socklen_t saddrlen;
483
484 logger_log(raop_rtp->logger, LOGGER_INFO, "Accepting client");
485 saddrlen = sizeof(saddr);
486 stream_fd = accept(raop_rtp->dsock, (struct sockaddr *)&saddr, &saddrlen);
487 if (stream_fd == -1) {
488 /* FIXME: Error happened */
489 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in accept %d %s", errno, strerror(errno));
490 break;
491 }
492 }
493 if (stream_fd != -1 && FD_ISSET(stream_fd, &rfds)) {
494 unsigned int rtplen=0;
495
496 const void *audiobuf;
497 int audiobuflen;
498
499 ret = recv(stream_fd, (char *)(packet+packetlen), sizeof(packet)-packetlen, 0);
500 if (ret == 0) {
501 /* TCP socket closed */
502 logger_log(raop_rtp->logger, LOGGER_INFO, "TCP socket closed");
503 break;
504 } else if (ret == -1) {
505 /* FIXME: Error happened */
506 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in recv");
507 break;
508 }
509 packetlen += ret;
510
511 /* Check that we have enough bytes */
512 if (packetlen < 4) {
513 continue;
514 }
515 if (packet[0] != '$' || packet[1] != '\0') {
516 /* FIXME: Incorrect RTP magic bytes */
517 break;
518 }
519 rtplen = (packet[2] << 8) | packet[3];
520 if (rtplen > sizeof(packet)) {
521 /* FIXME: Too long packet */
522 logger_log(raop_rtp->logger, LOGGER_INFO, "Error, packet too long %d", rtplen);
523 break;
524 }
525 if (packetlen < 4+rtplen) {
526 continue;
527 }
528
529 /* Packet is valid, process it */
530 ret = raop_buffer_queue(raop_rtp->buffer, packet+4, rtplen, 0);
531 assert(ret >= 0);
532
533 /* Remove processed bytes from packet buffer */
534 memmove(packet, packet+4+rtplen, packetlen-rtplen);
535 packetlen -= 4+rtplen;
536
537 /* Decode the received frame */
538 if ((audiobuf = raop_buffer_dequeue(raop_rtp->buffer, &audiobuflen, 1))) {
539 raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, cb_data, audiobuf, audiobuflen);
540 }
541 }
542 }
543
544 /* Close the stream file descriptor */
545 if (stream_fd != -1) {
546 closesocket(stream_fd);
547 }
548
549 logger_log(raop_rtp->logger, LOGGER_INFO, "Exiting TCP RAOP thread");
550 raop_rtp->callbacks.audio_destroy(raop_rtp->callbacks.cls, cb_data);
551
552 return 0;
553 }
554
555 void
556 raop_rtp_start(raop_rtp_t *raop_rtp, int use_udp, unsigned short control_rport, unsigned short timing_rport,
557 unsigned short *control_lport, unsigned short *timing_lport, unsigned short *data_lport)
558 {
559 int use_ipv6 = 0;
560
561 assert(raop_rtp);
562
563 MUTEX_LOCK(raop_rtp->run_mutex);
564 if (raop_rtp->running || !raop_rtp->joined) {
565 MUTEX_UNLOCK(raop_rtp->run_mutex);
566 return;
567 }
568
569 /* Initialize ports and sockets */
570 raop_rtp->control_rport = control_rport;
571 raop_rtp->timing_rport = timing_rport;
572 if (raop_rtp->remote_saddr.ss_family == AF_INET6) {
573 use_ipv6 = 1;
574 }
575 if (raop_rtp_init_sockets(raop_rtp, use_ipv6, use_udp) < 0) {
576 logger_log(raop_rtp->logger, LOGGER_INFO, "Initializing sockets failed");
577 MUTEX_UNLOCK(raop_rtp->run_mutex);
578 return;
579 }
580 if (control_lport) *control_lport = raop_rtp->control_lport;
581 if (timing_lport) *timing_lport = raop_rtp->timing_lport;
582 if (data_lport) *data_lport = raop_rtp->data_lport;
583
584 /* Create the thread and initialize running values */
585 raop_rtp->running = 1;
586 raop_rtp->joined = 0;
587 if (use_udp) {
588 THREAD_CREATE(raop_rtp->thread, raop_rtp_thread_udp, raop_rtp);
589 } else {
590 THREAD_CREATE(raop_rtp->thread, raop_rtp_thread_tcp, raop_rtp);
591 }
592 MUTEX_UNLOCK(raop_rtp->run_mutex);
593 }
594
595 void
596 raop_rtp_set_volume(raop_rtp_t *raop_rtp, float volume)
597 {
598 assert(raop_rtp);
599
600 if (volume > 0.0f) {
601 volume = 0.0f;
602 } else if (volume < -144.0f) {
603 volume = -144.0f;
604 }
605
606 /* Set volume in thread instead */
607 MUTEX_LOCK(raop_rtp->run_mutex);
608 raop_rtp->volume = volume;
609 raop_rtp->volume_changed = 1;
610 MUTEX_UNLOCK(raop_rtp->run_mutex);
611 }
612
613 void
614 raop_rtp_set_metadata(raop_rtp_t *raop_rtp, const char *data, int datalen)
615 {
616 unsigned char *metadata;
617
618 assert(raop_rtp);
619
620 if (datalen <= 0) {
621 return;
622 }
623 metadata = malloc(datalen);
624 assert(metadata);
625 memcpy(metadata, data, datalen);
626
627 /* Set metadata in thread instead */
628 MUTEX_LOCK(raop_rtp->run_mutex);
629 raop_rtp->metadata = metadata;
630 raop_rtp->metadata_len = datalen;
631 MUTEX_UNLOCK(raop_rtp->run_mutex);
632 }
633
634 void
635 raop_rtp_set_coverart(raop_rtp_t *raop_rtp, const char *data, int datalen)
636 {
637 unsigned char *coverart;
638
639 assert(raop_rtp);
640
641 if (datalen <= 0) {
642 return;
643 }
644 coverart = malloc(datalen);
645 assert(coverart);
646 memcpy(coverart, data, datalen);
647
648 /* Set coverart in thread instead */
649 MUTEX_LOCK(raop_rtp->run_mutex);
650 raop_rtp->coverart = coverart;
651 raop_rtp->coverart_len = datalen;
652 MUTEX_UNLOCK(raop_rtp->run_mutex);
653 }
654
655 void
656 raop_rtp_flush(raop_rtp_t *raop_rtp, int next_seq)
657 {
658 assert(raop_rtp);
659
660 /* Call flush in thread instead */
661 MUTEX_LOCK(raop_rtp->run_mutex);
662 raop_rtp->flush = next_seq;
663 MUTEX_UNLOCK(raop_rtp->run_mutex);
664 }
665
666 void
667 raop_rtp_stop(raop_rtp_t *raop_rtp)
668 {
669 assert(raop_rtp);
670
671 /* Check that we are running and thread is not
672 * joined (should never be while still running) */
673 MUTEX_LOCK(raop_rtp->run_mutex);
674 if (!raop_rtp->running || raop_rtp->joined) {
675 MUTEX_UNLOCK(raop_rtp->run_mutex);
676 return;
677 }
678 raop_rtp->running = 0;
679 MUTEX_UNLOCK(raop_rtp->run_mutex);
680
681 /* Join the thread */
682 THREAD_JOIN(raop_rtp->thread);
683 if (raop_rtp->csock != -1) closesocket(raop_rtp->csock);
684 if (raop_rtp->tsock != -1) closesocket(raop_rtp->tsock);
685 if (raop_rtp->dsock != -1) closesocket(raop_rtp->dsock);
686
687 /* Flush buffer into initial state */
688 raop_buffer_flush(raop_rtp->buffer, -1);
689
690 /* Mark thread as joined */
691 MUTEX_LOCK(raop_rtp->run_mutex);
692 raop_rtp->joined = 1;
693 MUTEX_UNLOCK(raop_rtp->run_mutex);
694 }