Pass metadata and coverart to the RTP thread
[deb_shairplay.git] / src / lib / raop_rtp.c
1 /**
2 * Copyright (C) 2011-2012 Juho Vähä-Herttua
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 */
14
15 #include <stdlib.h>
16 #include <stdio.h>
17 #include <string.h>
18 #include <assert.h>
19 #include <errno.h>
20
21 #include "raop_rtp.h"
22 #include "raop.h"
23 #include "raop_buffer.h"
24 #include "netutils.h"
25 #include "utils.h"
26 #include "compat.h"
27 #include "logger.h"
28
29 #define NO_FLUSH (-42)
30
31 struct raop_rtp_s {
32 logger_t *logger;
33 raop_callbacks_t callbacks;
34
35 /* Buffer to handle all resends */
36 raop_buffer_t *buffer;
37
38 /* Remote address as sockaddr */
39 struct sockaddr_storage remote_saddr;
40 socklen_t remote_saddr_len;
41
42 /* MUTEX LOCKED VARIABLES START */
43 /* These variables only edited mutex locked */
44 int running;
45 int joined;
46
47 float volume;
48 unsigned char *metadata;
49 int metadata_len;
50 unsigned char *coverart;
51 int coverart_len;
52
53 int flush;
54 thread_handle_t thread;
55 mutex_handle_t run_mutex;
56 /* MUTEX LOCKED VARIABLES END */
57
58 /* Remote control and timing ports */
59 unsigned short control_rport;
60 unsigned short timing_rport;
61
62 /* Sockets for control, timing and data */
63 int csock, tsock, dsock;
64
65 /* Local control, timing and data ports */
66 unsigned short control_lport;
67 unsigned short timing_lport;
68 unsigned short data_lport;
69
70 /* Initialized after the first control packet */
71 struct sockaddr_storage control_saddr;
72 socklen_t control_saddr_len;
73 unsigned short control_seqnum;
74 };
75
76 static int
77 raop_rtp_parse_remote(raop_rtp_t *raop_rtp, const char *remote)
78 {
79 char *original;
80 char *current;
81 char *tmpstr;
82 int family;
83 int ret;
84
85 assert(raop_rtp);
86
87 current = original = strdup(remote);
88 if (!original) {
89 return -1;
90 }
91 tmpstr = utils_strsep(&current, " ");
92 if (strcmp(tmpstr, "IN")) {
93 free(original);
94 return -1;
95 }
96 tmpstr = utils_strsep(&current, " ");
97 if (!strcmp(tmpstr, "IP4") && current) {
98 family = AF_INET;
99 } else if (!strcmp(tmpstr, "IP6") && current) {
100 family = AF_INET6;
101 } else {
102 free(original);
103 return -1;
104 }
105 if (strstr(current, ":")) {
106 /* FIXME: iTunes sends IP4 even with an IPv6 address, does it mean something */
107 family = AF_INET6;
108 }
109 ret = netutils_parse_address(family, current,
110 &raop_rtp->remote_saddr,
111 sizeof(raop_rtp->remote_saddr));
112 if (ret < 0) {
113 free(original);
114 return -1;
115 }
116 raop_rtp->remote_saddr_len = ret;
117 free(original);
118 return 0;
119 }
120
121 raop_rtp_t *
122 raop_rtp_init(logger_t *logger, raop_callbacks_t *callbacks, const char *remote,
123 const char *fmtp, const unsigned char *aeskey, const unsigned char *aesiv)
124 {
125 raop_rtp_t *raop_rtp;
126
127 assert(logger);
128 assert(callbacks);
129 assert(remote);
130 assert(fmtp);
131
132 raop_rtp = calloc(1, sizeof(raop_rtp_t));
133 if (!raop_rtp) {
134 return NULL;
135 }
136 raop_rtp->logger = logger;
137 memcpy(&raop_rtp->callbacks, callbacks, sizeof(raop_callbacks_t));
138 raop_rtp->buffer = raop_buffer_init(fmtp, aeskey, aesiv);
139 if (!raop_rtp->buffer) {
140 free(raop_rtp);
141 return NULL;
142 }
143 if (raop_rtp_parse_remote(raop_rtp, remote) < 0) {
144 free(raop_rtp);
145 return NULL;
146 }
147
148 raop_rtp->running = 0;
149 raop_rtp->joined = 1;
150 raop_rtp->flush = NO_FLUSH;
151 MUTEX_CREATE(raop_rtp->run_mutex);
152
153 return raop_rtp;
154 }
155
156 void
157 raop_rtp_destroy(raop_rtp_t *raop_rtp)
158 {
159 if (raop_rtp) {
160 raop_rtp_stop(raop_rtp);
161
162 MUTEX_DESTROY(raop_rtp->run_mutex);
163 raop_buffer_destroy(raop_rtp->buffer);
164 free(raop_rtp->metadata);
165 free(raop_rtp->coverart);
166 free(raop_rtp);
167 }
168 }
169
170 static int
171 raop_rtp_init_sockets(raop_rtp_t *raop_rtp, int use_ipv6, int use_udp)
172 {
173 int csock = -1, tsock = -1, dsock = -1;
174 unsigned short cport = 0, tport = 0, dport = 0;
175
176 assert(raop_rtp);
177
178 if (use_udp) {
179 csock = netutils_init_socket(&cport, use_ipv6, use_udp);
180 tsock = netutils_init_socket(&tport, use_ipv6, use_udp);
181 if (csock == -1 || tsock == -1) {
182 goto sockets_cleanup;
183 }
184 }
185 dsock = netutils_init_socket(&dport, use_ipv6, use_udp);
186 if (dsock == -1) {
187 goto sockets_cleanup;
188 }
189
190 /* Listen to the data socket if using TCP */
191 if (!use_udp) {
192 if (listen(dsock, 1) < 0)
193 goto sockets_cleanup;
194 }
195
196 /* Set socket descriptors */
197 raop_rtp->csock = csock;
198 raop_rtp->tsock = tsock;
199 raop_rtp->dsock = dsock;
200
201 /* Set port values */
202 raop_rtp->control_lport = cport;
203 raop_rtp->timing_lport = tport;
204 raop_rtp->data_lport = dport;
205 return 0;
206
207 sockets_cleanup:
208 if (csock != -1) closesocket(csock);
209 if (tsock != -1) closesocket(tsock);
210 if (dsock != -1) closesocket(dsock);
211 return -1;
212 }
213
214 static int
215 raop_rtp_resend_callback(void *opaque, unsigned short seqnum, unsigned short count)
216 {
217 raop_rtp_t *raop_rtp = opaque;
218 unsigned char packet[8];
219 unsigned short ourseqnum;
220 struct sockaddr *addr;
221 socklen_t addrlen;
222 int ret;
223
224 addr = (struct sockaddr *)&raop_rtp->control_saddr;
225 addrlen = raop_rtp->control_saddr_len;
226
227 logger_log(raop_rtp->logger, LOGGER_DEBUG, "Got resend request %d %d", seqnum, count);
228 ourseqnum = raop_rtp->control_seqnum++;
229
230 /* Fill the request buffer */
231 packet[0] = 0x80;
232 packet[1] = 0x55|0x80;
233 packet[2] = (ourseqnum >> 8);
234 packet[3] = ourseqnum;
235 packet[4] = (seqnum >> 8);
236 packet[5] = seqnum;
237 packet[6] = (count >> 8);
238 packet[7] = count;
239
240 ret = sendto(raop_rtp->csock, (const char *)packet, sizeof(packet), 0, addr, addrlen);
241 if (ret == -1) {
242 logger_log(raop_rtp->logger, LOGGER_WARNING, "Resend failed: %d", SOCKET_GET_ERROR());
243 }
244
245 return 0;
246 }
247
248 static THREAD_RETVAL
249 raop_rtp_thread_udp(void *arg)
250 {
251 raop_rtp_t *raop_rtp = arg;
252 unsigned char packet[RAOP_PACKET_LEN];
253 unsigned int packetlen;
254 struct sockaddr_storage saddr;
255 socklen_t saddrlen;
256 float volume = 0.0;
257
258 const ALACSpecificConfig *config;
259 void *cb_data = NULL;
260
261 assert(raop_rtp);
262
263 config = raop_buffer_get_config(raop_rtp->buffer);
264 cb_data = raop_rtp->callbacks.audio_init(raop_rtp->callbacks.cls,
265 config->bitDepth,
266 config->numChannels,
267 config->sampleRate);
268
269 while(1) {
270 int volume_changed;
271 int flush;
272
273 fd_set rfds;
274 struct timeval tv;
275 int nfds, ret;
276
277 MUTEX_LOCK(raop_rtp->run_mutex);
278 if (!raop_rtp->running) {
279 MUTEX_UNLOCK(raop_rtp->run_mutex);
280 break;
281 }
282 /* Read the volume level */
283 volume_changed = (volume != raop_rtp->volume);
284 volume = raop_rtp->volume;
285
286 /* Read the flush value */
287 flush = raop_rtp->flush;
288 raop_rtp->flush = NO_FLUSH;
289 MUTEX_UNLOCK(raop_rtp->run_mutex);
290
291 /* Call set_volume callback if changed */
292 if (volume_changed) {
293 raop_rtp->callbacks.audio_set_volume(raop_rtp->callbacks.cls, cb_data, volume);
294 }
295 if (flush != NO_FLUSH) {
296 raop_buffer_flush(raop_rtp->buffer, flush);
297 raop_rtp->callbacks.audio_flush(raop_rtp->callbacks.cls, cb_data);
298 }
299
300 /* Set timeout value to 5ms */
301 tv.tv_sec = 0;
302 tv.tv_usec = 5000;
303
304 /* Get the correct nfds value */
305 nfds = raop_rtp->csock+1;
306 if (raop_rtp->tsock >= nfds)
307 nfds = raop_rtp->tsock+1;
308 if (raop_rtp->dsock >= nfds)
309 nfds = raop_rtp->dsock+1;
310
311 /* Set rfds and call select */
312 FD_ZERO(&rfds);
313 FD_SET(raop_rtp->csock, &rfds);
314 FD_SET(raop_rtp->tsock, &rfds);
315 FD_SET(raop_rtp->dsock, &rfds);
316 ret = select(nfds, &rfds, NULL, NULL, &tv);
317 if (ret == 0) {
318 /* Timeout happened */
319 continue;
320 } else if (ret == -1) {
321 /* FIXME: Error happened */
322 break;
323 }
324
325 if (FD_ISSET(raop_rtp->csock, &rfds)) {
326 saddrlen = sizeof(saddr);
327 packetlen = recvfrom(raop_rtp->csock, (char *)packet, sizeof(packet), 0,
328 (struct sockaddr *)&saddr, &saddrlen);
329
330 /* Get the destination address here, because we need the sin6_scope_id */
331 memcpy(&raop_rtp->control_saddr, &saddr, saddrlen);
332 raop_rtp->control_saddr_len = saddrlen;
333
334 if (packetlen >= 12) {
335 char type = packet[1] & ~0x80;
336
337 logger_log(raop_rtp->logger, LOGGER_DEBUG, "Got control packet of type 0x%02x", type);
338 if (type == 0x56) {
339 /* Handle resent data packet */
340 int ret = raop_buffer_queue(raop_rtp->buffer, packet+4, packetlen-4, 1);
341 assert(ret >= 0);
342 }
343 }
344 } else if (FD_ISSET(raop_rtp->tsock, &rfds)) {
345 logger_log(raop_rtp->logger, LOGGER_INFO, "Would have timing packet in queue");
346 } else if (FD_ISSET(raop_rtp->dsock, &rfds)) {
347 saddrlen = sizeof(saddr);
348 packetlen = recvfrom(raop_rtp->dsock, (char *)packet, sizeof(packet), 0,
349 (struct sockaddr *)&saddr, &saddrlen);
350 if (packetlen >= 12) {
351 int no_resend = (raop_rtp->control_rport == 0);
352 int ret;
353
354 const void *audiobuf;
355 int audiobuflen;
356
357 ret = raop_buffer_queue(raop_rtp->buffer, packet, packetlen, 1);
358 assert(ret >= 0);
359
360 /* Decode all frames in queue */
361 while ((audiobuf = raop_buffer_dequeue(raop_rtp->buffer, &audiobuflen, no_resend))) {
362 raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, cb_data, audiobuf, audiobuflen);
363 }
364
365 /* Handle possible resend requests */
366 if (!no_resend) {
367 raop_buffer_handle_resends(raop_rtp->buffer, raop_rtp_resend_callback, raop_rtp);
368 }
369 }
370 }
371 }
372 logger_log(raop_rtp->logger, LOGGER_INFO, "Exiting UDP RAOP thread");
373 raop_rtp->callbacks.audio_destroy(raop_rtp->callbacks.cls, cb_data);
374
375 return 0;
376 }
377
378 static THREAD_RETVAL
379 raop_rtp_thread_tcp(void *arg)
380 {
381 raop_rtp_t *raop_rtp = arg;
382 int stream_fd = -1;
383 unsigned char packet[RAOP_PACKET_LEN];
384 unsigned int packetlen = 0;
385 float volume = 0.0;
386
387 const ALACSpecificConfig *config;
388 void *cb_data = NULL;
389
390 assert(raop_rtp);
391
392 config = raop_buffer_get_config(raop_rtp->buffer);
393 cb_data = raop_rtp->callbacks.audio_init(raop_rtp->callbacks.cls,
394 config->bitDepth,
395 config->numChannels,
396 config->sampleRate);
397
398 while (1) {
399 int volume_changed;
400
401 fd_set rfds;
402 struct timeval tv;
403 int nfds, ret;
404
405 MUTEX_LOCK(raop_rtp->run_mutex);
406 if (!raop_rtp->running) {
407 MUTEX_UNLOCK(raop_rtp->run_mutex);
408 break;
409 }
410 volume_changed = (volume != raop_rtp->volume);
411 volume = raop_rtp->volume;
412 MUTEX_UNLOCK(raop_rtp->run_mutex);
413
414 /* Call set_volume callback if changed */
415 if (volume_changed) {
416 raop_rtp->callbacks.audio_set_volume(raop_rtp->callbacks.cls, cb_data, volume);
417 }
418
419 /* Set timeout value to 5ms */
420 tv.tv_sec = 0;
421 tv.tv_usec = 5000;
422
423 /* Get the correct nfds value and set rfds */
424 FD_ZERO(&rfds);
425 if (stream_fd == -1) {
426 FD_SET(raop_rtp->dsock, &rfds);
427 nfds = raop_rtp->dsock+1;
428 } else {
429 FD_SET(stream_fd, &rfds);
430 nfds = stream_fd+1;
431 }
432 ret = select(nfds, &rfds, NULL, NULL, &tv);
433 if (ret == 0) {
434 /* Timeout happened */
435 continue;
436 } else if (ret == -1) {
437 /* FIXME: Error happened */
438 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in select");
439 break;
440 }
441 if (stream_fd == -1 && FD_ISSET(raop_rtp->dsock, &rfds)) {
442 struct sockaddr_storage saddr;
443 socklen_t saddrlen;
444
445 logger_log(raop_rtp->logger, LOGGER_INFO, "Accepting client");
446 saddrlen = sizeof(saddr);
447 stream_fd = accept(raop_rtp->dsock, (struct sockaddr *)&saddr, &saddrlen);
448 if (stream_fd == -1) {
449 /* FIXME: Error happened */
450 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in accept %d %s", errno, strerror(errno));
451 break;
452 }
453 }
454 if (stream_fd != -1 && FD_ISSET(stream_fd, &rfds)) {
455 unsigned int rtplen=0;
456
457 const void *audiobuf;
458 int audiobuflen;
459
460 ret = recv(stream_fd, (char *)(packet+packetlen), sizeof(packet)-packetlen, 0);
461 if (ret == 0) {
462 /* TCP socket closed */
463 logger_log(raop_rtp->logger, LOGGER_INFO, "TCP socket closed");
464 break;
465 } else if (ret == -1) {
466 /* FIXME: Error happened */
467 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in recv");
468 break;
469 }
470 packetlen += ret;
471
472 /* Check that we have enough bytes */
473 if (packetlen < 4) {
474 continue;
475 }
476 if (packet[0] != '$' || packet[1] != '\0') {
477 /* FIXME: Incorrect RTP magic bytes */
478 break;
479 }
480 rtplen = (packet[2] << 8) | packet[3];
481 if (rtplen > sizeof(packet)) {
482 /* FIXME: Too long packet */
483 logger_log(raop_rtp->logger, LOGGER_INFO, "Error, packet too long %d", rtplen);
484 break;
485 }
486 if (packetlen < 4+rtplen) {
487 continue;
488 }
489
490 /* Packet is valid, process it */
491 ret = raop_buffer_queue(raop_rtp->buffer, packet+4, rtplen, 0);
492 assert(ret >= 0);
493
494 /* Remove processed bytes from packet buffer */
495 memmove(packet, packet+4+rtplen, packetlen-rtplen);
496 packetlen -= 4+rtplen;
497
498 /* Decode the received frame */
499 if ((audiobuf = raop_buffer_dequeue(raop_rtp->buffer, &audiobuflen, 1))) {
500 raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, cb_data, audiobuf, audiobuflen);
501 }
502 }
503 }
504
505 /* Close the stream file descriptor */
506 if (stream_fd != -1) {
507 closesocket(stream_fd);
508 }
509
510 logger_log(raop_rtp->logger, LOGGER_INFO, "Exiting TCP RAOP thread");
511 raop_rtp->callbacks.audio_destroy(raop_rtp->callbacks.cls, cb_data);
512
513 return 0;
514 }
515
516 void
517 raop_rtp_start(raop_rtp_t *raop_rtp, int use_udp, unsigned short control_rport, unsigned short timing_rport,
518 unsigned short *control_lport, unsigned short *timing_lport, unsigned short *data_lport)
519 {
520 int use_ipv6 = 0;
521
522 assert(raop_rtp);
523
524 MUTEX_LOCK(raop_rtp->run_mutex);
525 if (raop_rtp->running || !raop_rtp->joined) {
526 MUTEX_UNLOCK(raop_rtp->run_mutex);
527 return;
528 }
529
530 /* Initialize ports and sockets */
531 raop_rtp->control_rport = control_rport;
532 raop_rtp->timing_rport = timing_rport;
533 if (raop_rtp->remote_saddr.ss_family == AF_INET6) {
534 use_ipv6 = 1;
535 }
536 if (raop_rtp_init_sockets(raop_rtp, use_ipv6, use_udp) < 0) {
537 logger_log(raop_rtp->logger, LOGGER_INFO, "Initializing sockets failed");
538 MUTEX_UNLOCK(raop_rtp->run_mutex);
539 return;
540 }
541 if (control_lport) *control_lport = raop_rtp->control_lport;
542 if (timing_lport) *timing_lport = raop_rtp->timing_lport;
543 if (data_lport) *data_lport = raop_rtp->data_lport;
544
545 /* Create the thread and initialize running values */
546 raop_rtp->running = 1;
547 raop_rtp->joined = 0;
548 if (use_udp) {
549 THREAD_CREATE(raop_rtp->thread, raop_rtp_thread_udp, raop_rtp);
550 } else {
551 THREAD_CREATE(raop_rtp->thread, raop_rtp_thread_tcp, raop_rtp);
552 }
553 MUTEX_UNLOCK(raop_rtp->run_mutex);
554 }
555
556 void
557 raop_rtp_set_volume(raop_rtp_t *raop_rtp, float volume)
558 {
559 assert(raop_rtp);
560
561 if (volume > 0.0f) {
562 volume = 0.0f;
563 } else if (volume < -144.0f) {
564 volume = -144.0f;
565 }
566
567 /* Set volume in thread instead */
568 MUTEX_LOCK(raop_rtp->run_mutex);
569 raop_rtp->volume = volume;
570 MUTEX_UNLOCK(raop_rtp->run_mutex);
571 }
572
573 void
574 raop_rtp_set_metadata(raop_rtp_t *raop_rtp, const char *data, int datalen)
575 {
576 unsigned char *metadata;
577
578 assert(raop_rtp);
579
580 if (datalen <= 0) {
581 return;
582 }
583 metadata = malloc(datalen);
584 assert(metadata);
585 memcpy(metadata, data, datalen);
586
587 /* Set metadata in thread instead */
588 MUTEX_LOCK(raop_rtp->run_mutex);
589 raop_rtp->metadata = metadata;
590 raop_rtp->metadata_len = datalen;
591 MUTEX_UNLOCK(raop_rtp->run_mutex);
592 }
593
594 void
595 raop_rtp_set_coverart(raop_rtp_t *raop_rtp, const char *data, int datalen)
596 {
597 unsigned char *coverart;
598
599 assert(raop_rtp);
600
601 if (datalen <= 0) {
602 return;
603 }
604 coverart = malloc(datalen);
605 assert(coverart);
606 memcpy(coverart, data, datalen);
607
608 /* Set coverart in thread instead */
609 MUTEX_LOCK(raop_rtp->run_mutex);
610 raop_rtp->coverart = coverart;
611 raop_rtp->coverart_len = datalen;
612 MUTEX_UNLOCK(raop_rtp->run_mutex);
613 }
614
615 void
616 raop_rtp_flush(raop_rtp_t *raop_rtp, int next_seq)
617 {
618 assert(raop_rtp);
619
620 /* Call flush in thread instead */
621 MUTEX_LOCK(raop_rtp->run_mutex);
622 raop_rtp->flush = next_seq;
623 MUTEX_UNLOCK(raop_rtp->run_mutex);
624 }
625
626 void
627 raop_rtp_stop(raop_rtp_t *raop_rtp)
628 {
629 assert(raop_rtp);
630
631 /* Check that we are running and thread is not
632 * joined (should never be while still running) */
633 MUTEX_LOCK(raop_rtp->run_mutex);
634 if (!raop_rtp->running || raop_rtp->joined) {
635 MUTEX_UNLOCK(raop_rtp->run_mutex);
636 return;
637 }
638 raop_rtp->running = 0;
639 MUTEX_UNLOCK(raop_rtp->run_mutex);
640
641 /* Join the thread */
642 THREAD_JOIN(raop_rtp->thread);
643 if (raop_rtp->csock != -1) closesocket(raop_rtp->csock);
644 if (raop_rtp->tsock != -1) closesocket(raop_rtp->tsock);
645 if (raop_rtp->dsock != -1) closesocket(raop_rtp->dsock);
646
647 /* Flush buffer into initial state */
648 raop_buffer_flush(raop_rtp->buffer, -1);
649
650 /* Mark thread as joined */
651 MUTEX_LOCK(raop_rtp->run_mutex);
652 raop_rtp->joined = 1;
653 MUTEX_UNLOCK(raop_rtp->run_mutex);
654 }