Add log level and log callback support to the RAOP API
[deb_shairplay.git] / src / lib / raop_rtp.c
1 /**
2 * Copyright (C) 2011-2012 Juho Vähä-Herttua
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 */
14
15 #include <stdlib.h>
16 #include <stdio.h>
17 #include <string.h>
18 #include <assert.h>
19 #include <errno.h>
20
21 #include "raop_rtp.h"
22 #include "raop.h"
23 #include "raop_buffer.h"
24 #include "netutils.h"
25 #include "utils.h"
26 #include "compat.h"
27 #include "logger.h"
28
29 #define NO_FLUSH (-42)
30
31 struct raop_rtp_s {
32 logger_t *logger;
33 raop_callbacks_t callbacks;
34
35 /* Buffer to handle all resends */
36 raop_buffer_t *buffer;
37
38 /* Remote address as sockaddr */
39 struct sockaddr_storage remote_saddr;
40 socklen_t remote_saddr_len;
41
42 /* MUTEX LOCKED VARIABLES START */
43 /* These variables only edited mutex locked */
44 int running;
45 int joined;
46
47 float volume;
48 unsigned char *metadata;
49 int metadata_len;
50 unsigned char *coverart;
51 int coverart_len;
52
53 int flush;
54 thread_handle_t thread;
55 mutex_handle_t run_mutex;
56 /* MUTEX LOCKED VARIABLES END */
57
58 /* Remote control and timing ports */
59 unsigned short control_rport;
60 unsigned short timing_rport;
61
62 /* Sockets for control, timing and data */
63 int csock, tsock, dsock;
64
65 /* Local control, timing and data ports */
66 unsigned short control_lport;
67 unsigned short timing_lport;
68 unsigned short data_lport;
69
70 /* Initialized after the first control packet */
71 struct sockaddr_storage control_saddr;
72 socklen_t control_saddr_len;
73 unsigned short control_seqnum;
74 };
75
76 static int
77 raop_rtp_parse_remote(raop_rtp_t *raop_rtp, const char *remote)
78 {
79 char *original;
80 char *current;
81 char *tmpstr;
82 int family;
83 int ret;
84
85 assert(raop_rtp);
86
87 current = original = strdup(remote);
88 if (!original) {
89 return -1;
90 }
91 tmpstr = utils_strsep(&current, " ");
92 if (strcmp(tmpstr, "IN")) {
93 free(original);
94 return -1;
95 }
96 tmpstr = utils_strsep(&current, " ");
97 if (!strcmp(tmpstr, "IP4") && current) {
98 family = AF_INET;
99 } else if (!strcmp(tmpstr, "IP6") && current) {
100 family = AF_INET6;
101 } else {
102 free(original);
103 return -1;
104 }
105 if (strstr(current, ":")) {
106 /* FIXME: iTunes sends IP4 even with an IPv6 address, does it mean something */
107 family = AF_INET6;
108 }
109 ret = netutils_parse_address(family, current,
110 &raop_rtp->remote_saddr,
111 sizeof(raop_rtp->remote_saddr));
112 if (ret < 0) {
113 free(original);
114 return -1;
115 }
116 raop_rtp->remote_saddr_len = ret;
117 free(original);
118 return 0;
119 }
120
121 raop_rtp_t *
122 raop_rtp_init(logger_t *logger, raop_callbacks_t *callbacks, const char *remote,
123 const char *fmtp, const unsigned char *aeskey, const unsigned char *aesiv)
124 {
125 raop_rtp_t *raop_rtp;
126
127 assert(logger);
128 assert(callbacks);
129 assert(remote);
130 assert(fmtp);
131
132 raop_rtp = calloc(1, sizeof(raop_rtp_t));
133 if (!raop_rtp) {
134 return NULL;
135 }
136 raop_rtp->logger = logger;
137 memcpy(&raop_rtp->callbacks, callbacks, sizeof(raop_callbacks_t));
138 raop_rtp->buffer = raop_buffer_init(fmtp, aeskey, aesiv);
139 if (!raop_rtp->buffer) {
140 free(raop_rtp);
141 return NULL;
142 }
143 if (raop_rtp_parse_remote(raop_rtp, remote) < 0) {
144 free(raop_rtp);
145 return NULL;
146 }
147
148 raop_rtp->running = 0;
149 raop_rtp->joined = 1;
150 raop_rtp->flush = NO_FLUSH;
151 MUTEX_CREATE(raop_rtp->run_mutex);
152
153 return raop_rtp;
154 }
155
156 void
157 raop_rtp_destroy(raop_rtp_t *raop_rtp)
158 {
159 if (raop_rtp) {
160 raop_rtp_stop(raop_rtp);
161
162 MUTEX_DESTROY(raop_rtp->run_mutex);
163 raop_buffer_destroy(raop_rtp->buffer);
164 free(raop_rtp->metadata);
165 free(raop_rtp->coverart);
166 free(raop_rtp);
167 }
168 }
169
170 static int
171 raop_rtp_init_sockets(raop_rtp_t *raop_rtp, int use_ipv6, int use_udp)
172 {
173 int csock = -1, tsock = -1, dsock = -1;
174 unsigned short cport = 0, tport = 0, dport = 0;
175
176 assert(raop_rtp);
177
178 if (use_udp) {
179 csock = netutils_init_socket(&cport, use_ipv6, use_udp);
180 tsock = netutils_init_socket(&tport, use_ipv6, use_udp);
181 if (csock == -1 || tsock == -1) {
182 goto sockets_cleanup;
183 }
184 }
185 dsock = netutils_init_socket(&dport, use_ipv6, use_udp);
186 if (dsock == -1) {
187 goto sockets_cleanup;
188 }
189
190 /* Listen to the data socket if using TCP */
191 if (!use_udp) {
192 if (listen(dsock, 1) < 0)
193 goto sockets_cleanup;
194 }
195
196 /* Set socket descriptors */
197 raop_rtp->csock = csock;
198 raop_rtp->tsock = tsock;
199 raop_rtp->dsock = dsock;
200
201 /* Set port values */
202 raop_rtp->control_lport = cport;
203 raop_rtp->timing_lport = tport;
204 raop_rtp->data_lport = dport;
205 return 0;
206
207 sockets_cleanup:
208 if (csock != -1) closesocket(csock);
209 if (tsock != -1) closesocket(tsock);
210 if (dsock != -1) closesocket(dsock);
211 return -1;
212 }
213
214 static int
215 raop_rtp_resend_callback(void *opaque, unsigned short seqnum, unsigned short count)
216 {
217 raop_rtp_t *raop_rtp = opaque;
218 unsigned char packet[8];
219 unsigned short ourseqnum;
220 struct sockaddr *addr;
221 socklen_t addrlen;
222 int ret;
223
224 addr = (struct sockaddr *)&raop_rtp->control_saddr;
225 addrlen = raop_rtp->control_saddr_len;
226
227 logger_log(raop_rtp->logger, LOGGER_DEBUG, "Got resend request %d %d", seqnum, count);
228 ourseqnum = raop_rtp->control_seqnum++;
229
230 /* Fill the request buffer */
231 packet[0] = 0x80;
232 packet[1] = 0x55|0x80;
233 packet[2] = (ourseqnum >> 8);
234 packet[3] = ourseqnum;
235 packet[4] = (seqnum >> 8);
236 packet[5] = seqnum;
237 packet[6] = (count >> 8);
238 packet[7] = count;
239
240 ret = sendto(raop_rtp->csock, (const char *)packet, sizeof(packet), 0, addr, addrlen);
241 if (ret == -1) {
242 logger_log(raop_rtp->logger, LOGGER_WARNING, "Resend failed: %d", SOCKET_GET_ERROR());
243 }
244
245 return 0;
246 }
247
248 static int
249 raop_rtp_process_events(raop_rtp_t *raop_rtp, void *cb_data, float *volume)
250 {
251 int flush;
252 int volume_changed;
253 unsigned char *metadata;
254 int metadata_len;
255 unsigned char *coverart;
256 int coverart_len;
257
258 assert(raop_rtp);
259 assert(volume);
260
261 MUTEX_LOCK(raop_rtp->run_mutex);
262 if (!raop_rtp->running) {
263 MUTEX_UNLOCK(raop_rtp->run_mutex);
264 return 1;
265 }
266
267 /* Read the volume level */
268 volume_changed = (*volume != raop_rtp->volume);
269 *volume = raop_rtp->volume;
270
271 /* Read the flush value */
272 flush = raop_rtp->flush;
273 raop_rtp->flush = NO_FLUSH;
274
275 /* Read the metadata */
276 metadata = raop_rtp->metadata;
277 metadata_len = raop_rtp->metadata_len;
278 raop_rtp->metadata = NULL;
279 raop_rtp->metadata_len = 0;
280
281 /* Read the coverart */
282 coverart = raop_rtp->coverart;
283 coverart_len = raop_rtp->coverart_len;
284 raop_rtp->coverart = NULL;
285 raop_rtp->coverart_len = 0;
286 MUTEX_UNLOCK(raop_rtp->run_mutex);
287
288 /* Call set_volume callback if changed */
289 if (volume_changed) {
290 if (raop_rtp->callbacks.audio_set_volume) {
291 raop_rtp->callbacks.audio_set_volume(raop_rtp->callbacks.cls, cb_data, *volume);
292 }
293 }
294
295 /* Handle flush if requested */
296 if (flush != NO_FLUSH) {
297 raop_buffer_flush(raop_rtp->buffer, flush);
298 if (raop_rtp->callbacks.audio_flush) {
299 raop_rtp->callbacks.audio_flush(raop_rtp->callbacks.cls, cb_data);
300 }
301 }
302 if (metadata != NULL) {
303 if (raop_rtp->callbacks.audio_set_metadata) {
304 raop_rtp->callbacks.audio_set_metadata(raop_rtp->callbacks.cls, cb_data, metadata, metadata_len);
305 }
306 free(metadata);
307 metadata = NULL;
308 }
309 if (coverart != NULL) {
310 if (raop_rtp->callbacks.audio_set_coverart) {
311 raop_rtp->callbacks.audio_set_coverart(raop_rtp->callbacks.cls, cb_data, coverart, coverart_len);
312 }
313 free(coverart);
314 coverart = NULL;
315 }
316 return 0;
317 }
318
319 static THREAD_RETVAL
320 raop_rtp_thread_udp(void *arg)
321 {
322 raop_rtp_t *raop_rtp = arg;
323 unsigned char packet[RAOP_PACKET_LEN];
324 unsigned int packetlen;
325 struct sockaddr_storage saddr;
326 socklen_t saddrlen;
327 float volume = 0.0;
328
329 const ALACSpecificConfig *config;
330 void *cb_data = NULL;
331
332 assert(raop_rtp);
333
334 config = raop_buffer_get_config(raop_rtp->buffer);
335 cb_data = raop_rtp->callbacks.audio_init(raop_rtp->callbacks.cls,
336 config->bitDepth,
337 config->numChannels,
338 config->sampleRate);
339
340 while(1) {
341 fd_set rfds;
342 struct timeval tv;
343 int nfds, ret;
344
345 /* Check if we are still running and process callbacks */
346 if (raop_rtp_process_events(raop_rtp, cb_data, &volume)) {
347 break;
348 }
349
350 /* Set timeout value to 5ms */
351 tv.tv_sec = 0;
352 tv.tv_usec = 5000;
353
354 /* Get the correct nfds value */
355 nfds = raop_rtp->csock+1;
356 if (raop_rtp->tsock >= nfds)
357 nfds = raop_rtp->tsock+1;
358 if (raop_rtp->dsock >= nfds)
359 nfds = raop_rtp->dsock+1;
360
361 /* Set rfds and call select */
362 FD_ZERO(&rfds);
363 FD_SET(raop_rtp->csock, &rfds);
364 FD_SET(raop_rtp->tsock, &rfds);
365 FD_SET(raop_rtp->dsock, &rfds);
366 ret = select(nfds, &rfds, NULL, NULL, &tv);
367 if (ret == 0) {
368 /* Timeout happened */
369 continue;
370 } else if (ret == -1) {
371 /* FIXME: Error happened */
372 break;
373 }
374
375 if (FD_ISSET(raop_rtp->csock, &rfds)) {
376 saddrlen = sizeof(saddr);
377 packetlen = recvfrom(raop_rtp->csock, (char *)packet, sizeof(packet), 0,
378 (struct sockaddr *)&saddr, &saddrlen);
379
380 /* Get the destination address here, because we need the sin6_scope_id */
381 memcpy(&raop_rtp->control_saddr, &saddr, saddrlen);
382 raop_rtp->control_saddr_len = saddrlen;
383
384 if (packetlen >= 12) {
385 char type = packet[1] & ~0x80;
386
387 logger_log(raop_rtp->logger, LOGGER_DEBUG, "Got control packet of type 0x%02x", type);
388 if (type == 0x56) {
389 /* Handle resent data packet */
390 int ret = raop_buffer_queue(raop_rtp->buffer, packet+4, packetlen-4, 1);
391 assert(ret >= 0);
392 }
393 }
394 } else if (FD_ISSET(raop_rtp->tsock, &rfds)) {
395 logger_log(raop_rtp->logger, LOGGER_INFO, "Would have timing packet in queue");
396 } else if (FD_ISSET(raop_rtp->dsock, &rfds)) {
397 saddrlen = sizeof(saddr);
398 packetlen = recvfrom(raop_rtp->dsock, (char *)packet, sizeof(packet), 0,
399 (struct sockaddr *)&saddr, &saddrlen);
400 if (packetlen >= 12) {
401 int no_resend = (raop_rtp->control_rport == 0);
402 int ret;
403
404 const void *audiobuf;
405 int audiobuflen;
406
407 ret = raop_buffer_queue(raop_rtp->buffer, packet, packetlen, 1);
408 assert(ret >= 0);
409
410 /* Decode all frames in queue */
411 while ((audiobuf = raop_buffer_dequeue(raop_rtp->buffer, &audiobuflen, no_resend))) {
412 raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, cb_data, audiobuf, audiobuflen);
413 }
414
415 /* Handle possible resend requests */
416 if (!no_resend) {
417 raop_buffer_handle_resends(raop_rtp->buffer, raop_rtp_resend_callback, raop_rtp);
418 }
419 }
420 }
421 }
422 logger_log(raop_rtp->logger, LOGGER_INFO, "Exiting UDP RAOP thread");
423 raop_rtp->callbacks.audio_destroy(raop_rtp->callbacks.cls, cb_data);
424
425 return 0;
426 }
427
428 static THREAD_RETVAL
429 raop_rtp_thread_tcp(void *arg)
430 {
431 raop_rtp_t *raop_rtp = arg;
432 int stream_fd = -1;
433 unsigned char packet[RAOP_PACKET_LEN];
434 unsigned int packetlen = 0;
435 float volume = 0.0;
436
437 const ALACSpecificConfig *config;
438 void *cb_data = NULL;
439
440 assert(raop_rtp);
441
442 config = raop_buffer_get_config(raop_rtp->buffer);
443 cb_data = raop_rtp->callbacks.audio_init(raop_rtp->callbacks.cls,
444 config->bitDepth,
445 config->numChannels,
446 config->sampleRate);
447
448 while (1) {
449 fd_set rfds;
450 struct timeval tv;
451 int nfds, ret;
452
453 /* Check if we are still running and process callbacks */
454 if (raop_rtp_process_events(raop_rtp, cb_data, &volume)) {
455 break;
456 }
457
458 /* Set timeout value to 5ms */
459 tv.tv_sec = 0;
460 tv.tv_usec = 5000;
461
462 /* Get the correct nfds value and set rfds */
463 FD_ZERO(&rfds);
464 if (stream_fd == -1) {
465 FD_SET(raop_rtp->dsock, &rfds);
466 nfds = raop_rtp->dsock+1;
467 } else {
468 FD_SET(stream_fd, &rfds);
469 nfds = stream_fd+1;
470 }
471 ret = select(nfds, &rfds, NULL, NULL, &tv);
472 if (ret == 0) {
473 /* Timeout happened */
474 continue;
475 } else if (ret == -1) {
476 /* FIXME: Error happened */
477 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in select");
478 break;
479 }
480 if (stream_fd == -1 && FD_ISSET(raop_rtp->dsock, &rfds)) {
481 struct sockaddr_storage saddr;
482 socklen_t saddrlen;
483
484 logger_log(raop_rtp->logger, LOGGER_INFO, "Accepting client");
485 saddrlen = sizeof(saddr);
486 stream_fd = accept(raop_rtp->dsock, (struct sockaddr *)&saddr, &saddrlen);
487 if (stream_fd == -1) {
488 /* FIXME: Error happened */
489 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in accept %d %s", errno, strerror(errno));
490 break;
491 }
492 }
493 if (stream_fd != -1 && FD_ISSET(stream_fd, &rfds)) {
494 unsigned int rtplen=0;
495
496 const void *audiobuf;
497 int audiobuflen;
498
499 ret = recv(stream_fd, (char *)(packet+packetlen), sizeof(packet)-packetlen, 0);
500 if (ret == 0) {
501 /* TCP socket closed */
502 logger_log(raop_rtp->logger, LOGGER_INFO, "TCP socket closed");
503 break;
504 } else if (ret == -1) {
505 /* FIXME: Error happened */
506 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in recv");
507 break;
508 }
509 packetlen += ret;
510
511 /* Check that we have enough bytes */
512 if (packetlen < 4) {
513 continue;
514 }
515 if (packet[0] != '$' || packet[1] != '\0') {
516 /* FIXME: Incorrect RTP magic bytes */
517 break;
518 }
519 rtplen = (packet[2] << 8) | packet[3];
520 if (rtplen > sizeof(packet)) {
521 /* FIXME: Too long packet */
522 logger_log(raop_rtp->logger, LOGGER_INFO, "Error, packet too long %d", rtplen);
523 break;
524 }
525 if (packetlen < 4+rtplen) {
526 continue;
527 }
528
529 /* Packet is valid, process it */
530 ret = raop_buffer_queue(raop_rtp->buffer, packet+4, rtplen, 0);
531 assert(ret >= 0);
532
533 /* Remove processed bytes from packet buffer */
534 memmove(packet, packet+4+rtplen, packetlen-rtplen);
535 packetlen -= 4+rtplen;
536
537 /* Decode the received frame */
538 if ((audiobuf = raop_buffer_dequeue(raop_rtp->buffer, &audiobuflen, 1))) {
539 raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, cb_data, audiobuf, audiobuflen);
540 }
541 }
542 }
543
544 /* Close the stream file descriptor */
545 if (stream_fd != -1) {
546 closesocket(stream_fd);
547 }
548
549 logger_log(raop_rtp->logger, LOGGER_INFO, "Exiting TCP RAOP thread");
550 raop_rtp->callbacks.audio_destroy(raop_rtp->callbacks.cls, cb_data);
551
552 return 0;
553 }
554
555 void
556 raop_rtp_start(raop_rtp_t *raop_rtp, int use_udp, unsigned short control_rport, unsigned short timing_rport,
557 unsigned short *control_lport, unsigned short *timing_lport, unsigned short *data_lport)
558 {
559 int use_ipv6 = 0;
560
561 assert(raop_rtp);
562
563 MUTEX_LOCK(raop_rtp->run_mutex);
564 if (raop_rtp->running || !raop_rtp->joined) {
565 MUTEX_UNLOCK(raop_rtp->run_mutex);
566 return;
567 }
568
569 /* Initialize ports and sockets */
570 raop_rtp->control_rport = control_rport;
571 raop_rtp->timing_rport = timing_rport;
572 if (raop_rtp->remote_saddr.ss_family == AF_INET6) {
573 use_ipv6 = 1;
574 }
575 if (raop_rtp_init_sockets(raop_rtp, use_ipv6, use_udp) < 0) {
576 logger_log(raop_rtp->logger, LOGGER_INFO, "Initializing sockets failed");
577 MUTEX_UNLOCK(raop_rtp->run_mutex);
578 return;
579 }
580 if (control_lport) *control_lport = raop_rtp->control_lport;
581 if (timing_lport) *timing_lport = raop_rtp->timing_lport;
582 if (data_lport) *data_lport = raop_rtp->data_lport;
583
584 /* Create the thread and initialize running values */
585 raop_rtp->running = 1;
586 raop_rtp->joined = 0;
587 if (use_udp) {
588 THREAD_CREATE(raop_rtp->thread, raop_rtp_thread_udp, raop_rtp);
589 } else {
590 THREAD_CREATE(raop_rtp->thread, raop_rtp_thread_tcp, raop_rtp);
591 }
592 MUTEX_UNLOCK(raop_rtp->run_mutex);
593 }
594
595 void
596 raop_rtp_set_volume(raop_rtp_t *raop_rtp, float volume)
597 {
598 assert(raop_rtp);
599
600 if (volume > 0.0f) {
601 volume = 0.0f;
602 } else if (volume < -144.0f) {
603 volume = -144.0f;
604 }
605
606 /* Set volume in thread instead */
607 MUTEX_LOCK(raop_rtp->run_mutex);
608 raop_rtp->volume = volume;
609 MUTEX_UNLOCK(raop_rtp->run_mutex);
610 }
611
612 void
613 raop_rtp_set_metadata(raop_rtp_t *raop_rtp, const char *data, int datalen)
614 {
615 unsigned char *metadata;
616
617 assert(raop_rtp);
618
619 if (datalen <= 0) {
620 return;
621 }
622 metadata = malloc(datalen);
623 assert(metadata);
624 memcpy(metadata, data, datalen);
625
626 /* Set metadata in thread instead */
627 MUTEX_LOCK(raop_rtp->run_mutex);
628 raop_rtp->metadata = metadata;
629 raop_rtp->metadata_len = datalen;
630 MUTEX_UNLOCK(raop_rtp->run_mutex);
631 }
632
633 void
634 raop_rtp_set_coverart(raop_rtp_t *raop_rtp, const char *data, int datalen)
635 {
636 unsigned char *coverart;
637
638 assert(raop_rtp);
639
640 if (datalen <= 0) {
641 return;
642 }
643 coverart = malloc(datalen);
644 assert(coverart);
645 memcpy(coverart, data, datalen);
646
647 /* Set coverart in thread instead */
648 MUTEX_LOCK(raop_rtp->run_mutex);
649 raop_rtp->coverart = coverart;
650 raop_rtp->coverart_len = datalen;
651 MUTEX_UNLOCK(raop_rtp->run_mutex);
652 }
653
654 void
655 raop_rtp_flush(raop_rtp_t *raop_rtp, int next_seq)
656 {
657 assert(raop_rtp);
658
659 /* Call flush in thread instead */
660 MUTEX_LOCK(raop_rtp->run_mutex);
661 raop_rtp->flush = next_seq;
662 MUTEX_UNLOCK(raop_rtp->run_mutex);
663 }
664
665 void
666 raop_rtp_stop(raop_rtp_t *raop_rtp)
667 {
668 assert(raop_rtp);
669
670 /* Check that we are running and thread is not
671 * joined (should never be while still running) */
672 MUTEX_LOCK(raop_rtp->run_mutex);
673 if (!raop_rtp->running || raop_rtp->joined) {
674 MUTEX_UNLOCK(raop_rtp->run_mutex);
675 return;
676 }
677 raop_rtp->running = 0;
678 MUTEX_UNLOCK(raop_rtp->run_mutex);
679
680 /* Join the thread */
681 THREAD_JOIN(raop_rtp->thread);
682 if (raop_rtp->csock != -1) closesocket(raop_rtp->csock);
683 if (raop_rtp->tsock != -1) closesocket(raop_rtp->tsock);
684 if (raop_rtp->dsock != -1) closesocket(raop_rtp->dsock);
685
686 /* Flush buffer into initial state */
687 raop_buffer_flush(raop_rtp->buffer, -1);
688
689 /* Mark thread as joined */
690 MUTEX_LOCK(raop_rtp->run_mutex);
691 raop_rtp->joined = 1;
692 MUTEX_UNLOCK(raop_rtp->run_mutex);
693 }