2 * Copyright (C) 2011-2012 Juho Vähä-Herttua
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
23 #include "raop_buffer.h"
29 #define NO_FLUSH (-42)
33 raop_callbacks_t callbacks
;
35 /* Buffer to handle all resends */
36 raop_buffer_t
*buffer
;
38 /* Remote address as sockaddr */
39 struct sockaddr_storage remote_saddr
;
40 socklen_t remote_saddr_len
;
42 /* MUTEX LOCKED VARIABLES START */
43 /* These variables only edited mutex locked */
49 unsigned char *metadata
;
51 unsigned char *coverart
;
55 thread_handle_t thread
;
56 mutex_handle_t run_mutex
;
57 /* MUTEX LOCKED VARIABLES END */
59 /* Remote control and timing ports */
60 unsigned short control_rport
;
61 unsigned short timing_rport
;
63 /* Sockets for control, timing and data */
64 int csock
, tsock
, dsock
;
66 /* Local control, timing and data ports */
67 unsigned short control_lport
;
68 unsigned short timing_lport
;
69 unsigned short data_lport
;
71 /* Initialized after the first control packet */
72 struct sockaddr_storage control_saddr
;
73 socklen_t control_saddr_len
;
74 unsigned short control_seqnum
;
78 raop_rtp_parse_remote(raop_rtp_t
*raop_rtp
, const char *remote
)
88 current
= original
= strdup(remote
);
92 tmpstr
= utils_strsep(¤t
, " ");
93 if (strcmp(tmpstr
, "IN")) {
97 tmpstr
= utils_strsep(¤t
, " ");
98 if (!strcmp(tmpstr
, "IP4") && current
) {
100 } else if (!strcmp(tmpstr
, "IP6") && current
) {
106 if (strstr(current
, ":")) {
107 /* FIXME: iTunes sends IP4 even with an IPv6 address, does it mean something */
110 ret
= netutils_parse_address(family
, current
,
111 &raop_rtp
->remote_saddr
,
112 sizeof(raop_rtp
->remote_saddr
));
117 raop_rtp
->remote_saddr_len
= ret
;
123 raop_rtp_init(logger_t
*logger
, raop_callbacks_t
*callbacks
, const char *remote
,
124 const char *rtpmap
, const char *fmtp
,
125 const unsigned char *aeskey
, const unsigned char *aesiv
)
127 raop_rtp_t
*raop_rtp
;
135 raop_rtp
= calloc(1, sizeof(raop_rtp_t
));
139 raop_rtp
->logger
= logger
;
140 memcpy(&raop_rtp
->callbacks
, callbacks
, sizeof(raop_callbacks_t
));
141 raop_rtp
->buffer
= raop_buffer_init(rtpmap
, fmtp
, aeskey
, aesiv
);
142 if (!raop_rtp
->buffer
) {
146 if (raop_rtp_parse_remote(raop_rtp
, remote
) < 0) {
151 raop_rtp
->running
= 0;
152 raop_rtp
->joined
= 1;
153 raop_rtp
->flush
= NO_FLUSH
;
154 MUTEX_CREATE(raop_rtp
->run_mutex
);
160 raop_rtp_destroy(raop_rtp_t
*raop_rtp
)
163 raop_rtp_stop(raop_rtp
);
165 MUTEX_DESTROY(raop_rtp
->run_mutex
);
166 raop_buffer_destroy(raop_rtp
->buffer
);
167 free(raop_rtp
->metadata
);
168 free(raop_rtp
->coverart
);
174 raop_rtp_init_sockets(raop_rtp_t
*raop_rtp
, int use_ipv6
, int use_udp
)
176 int csock
= -1, tsock
= -1, dsock
= -1;
177 unsigned short cport
= 0, tport
= 0, dport
= 0;
182 csock
= netutils_init_socket(&cport
, use_ipv6
, use_udp
);
183 tsock
= netutils_init_socket(&tport
, use_ipv6
, use_udp
);
184 if (csock
== -1 || tsock
== -1) {
185 goto sockets_cleanup
;
188 dsock
= netutils_init_socket(&dport
, use_ipv6
, use_udp
);
190 goto sockets_cleanup
;
193 /* Listen to the data socket if using TCP */
195 if (listen(dsock
, 1) < 0)
196 goto sockets_cleanup
;
199 /* Set socket descriptors */
200 raop_rtp
->csock
= csock
;
201 raop_rtp
->tsock
= tsock
;
202 raop_rtp
->dsock
= dsock
;
204 /* Set port values */
205 raop_rtp
->control_lport
= cport
;
206 raop_rtp
->timing_lport
= tport
;
207 raop_rtp
->data_lport
= dport
;
211 if (csock
!= -1) closesocket(csock
);
212 if (tsock
!= -1) closesocket(tsock
);
213 if (dsock
!= -1) closesocket(dsock
);
218 raop_rtp_resend_callback(void *opaque
, unsigned short seqnum
, unsigned short count
)
220 raop_rtp_t
*raop_rtp
= opaque
;
221 unsigned char packet
[8];
222 unsigned short ourseqnum
;
223 struct sockaddr
*addr
;
227 addr
= (struct sockaddr
*)&raop_rtp
->control_saddr
;
228 addrlen
= raop_rtp
->control_saddr_len
;
230 logger_log(raop_rtp
->logger
, LOGGER_DEBUG
, "Got resend request %d %d", seqnum
, count
);
231 ourseqnum
= raop_rtp
->control_seqnum
++;
233 /* Fill the request buffer */
235 packet
[1] = 0x55|0x80;
236 packet
[2] = (ourseqnum
>> 8);
237 packet
[3] = ourseqnum
;
238 packet
[4] = (seqnum
>> 8);
240 packet
[6] = (count
>> 8);
243 ret
= sendto(raop_rtp
->csock
, (const char *)packet
, sizeof(packet
), 0, addr
, addrlen
);
245 logger_log(raop_rtp
->logger
, LOGGER_WARNING
, "Resend failed: %d", SOCKET_GET_ERROR());
252 raop_rtp_process_events(raop_rtp_t
*raop_rtp
, void *cb_data
)
257 unsigned char *metadata
;
259 unsigned char *coverart
;
264 MUTEX_LOCK(raop_rtp
->run_mutex
);
265 if (!raop_rtp
->running
) {
266 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
270 /* Read the volume level */
271 volume
= raop_rtp
->volume
;
272 volume_changed
= raop_rtp
->volume_changed
;
273 raop_rtp
->volume_changed
= 0;
275 /* Read the flush value */
276 flush
= raop_rtp
->flush
;
277 raop_rtp
->flush
= NO_FLUSH
;
279 /* Read the metadata */
280 metadata
= raop_rtp
->metadata
;
281 metadata_len
= raop_rtp
->metadata_len
;
282 raop_rtp
->metadata
= NULL
;
283 raop_rtp
->metadata_len
= 0;
285 /* Read the coverart */
286 coverart
= raop_rtp
->coverart
;
287 coverart_len
= raop_rtp
->coverart_len
;
288 raop_rtp
->coverart
= NULL
;
289 raop_rtp
->coverart_len
= 0;
290 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
292 /* Call set_volume callback if changed */
293 if (volume_changed
) {
294 if (raop_rtp
->callbacks
.audio_set_volume
) {
295 raop_rtp
->callbacks
.audio_set_volume(raop_rtp
->callbacks
.cls
, cb_data
, volume
);
299 /* Handle flush if requested */
300 if (flush
!= NO_FLUSH
) {
301 raop_buffer_flush(raop_rtp
->buffer
, flush
);
302 if (raop_rtp
->callbacks
.audio_flush
) {
303 raop_rtp
->callbacks
.audio_flush(raop_rtp
->callbacks
.cls
, cb_data
);
306 if (metadata
!= NULL
) {
307 if (raop_rtp
->callbacks
.audio_set_metadata
) {
308 raop_rtp
->callbacks
.audio_set_metadata(raop_rtp
->callbacks
.cls
, cb_data
, metadata
, metadata_len
);
313 if (coverart
!= NULL
) {
314 if (raop_rtp
->callbacks
.audio_set_coverart
) {
315 raop_rtp
->callbacks
.audio_set_coverart(raop_rtp
->callbacks
.cls
, cb_data
, coverart
, coverart_len
);
324 raop_rtp_thread_udp(void *arg
)
326 raop_rtp_t
*raop_rtp
= arg
;
327 unsigned char packet
[RAOP_PACKET_LEN
];
328 unsigned int packetlen
;
329 struct sockaddr_storage saddr
;
332 const ALACSpecificConfig
*config
;
333 void *cb_data
= NULL
;
337 config
= raop_buffer_get_config(raop_rtp
->buffer
);
338 cb_data
= raop_rtp
->callbacks
.audio_init(raop_rtp
->callbacks
.cls
,
348 /* Check if we are still running and process callbacks */
349 if (raop_rtp_process_events(raop_rtp
, cb_data
)) {
353 /* Set timeout value to 5ms */
357 /* Get the correct nfds value */
358 nfds
= raop_rtp
->csock
+1;
359 if (raop_rtp
->tsock
>= nfds
)
360 nfds
= raop_rtp
->tsock
+1;
361 if (raop_rtp
->dsock
>= nfds
)
362 nfds
= raop_rtp
->dsock
+1;
364 /* Set rfds and call select */
366 FD_SET(raop_rtp
->csock
, &rfds
);
367 FD_SET(raop_rtp
->tsock
, &rfds
);
368 FD_SET(raop_rtp
->dsock
, &rfds
);
369 ret
= select(nfds
, &rfds
, NULL
, NULL
, &tv
);
371 /* Timeout happened */
373 } else if (ret
== -1) {
374 /* FIXME: Error happened */
378 if (FD_ISSET(raop_rtp
->csock
, &rfds
)) {
379 saddrlen
= sizeof(saddr
);
380 packetlen
= recvfrom(raop_rtp
->csock
, (char *)packet
, sizeof(packet
), 0,
381 (struct sockaddr
*)&saddr
, &saddrlen
);
383 /* Get the destination address here, because we need the sin6_scope_id */
384 memcpy(&raop_rtp
->control_saddr
, &saddr
, saddrlen
);
385 raop_rtp
->control_saddr_len
= saddrlen
;
387 if (packetlen
>= 12) {
388 char type
= packet
[1] & ~0x80;
390 logger_log(raop_rtp
->logger
, LOGGER_DEBUG
, "Got control packet of type 0x%02x", type
);
392 /* Handle resent data packet */
393 int ret
= raop_buffer_queue(raop_rtp
->buffer
, packet
+4, packetlen
-4, 1);
397 } else if (FD_ISSET(raop_rtp
->tsock
, &rfds
)) {
398 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Would have timing packet in queue");
399 } else if (FD_ISSET(raop_rtp
->dsock
, &rfds
)) {
400 saddrlen
= sizeof(saddr
);
401 packetlen
= recvfrom(raop_rtp
->dsock
, (char *)packet
, sizeof(packet
), 0,
402 (struct sockaddr
*)&saddr
, &saddrlen
);
403 if (packetlen
>= 12) {
404 int no_resend
= (raop_rtp
->control_rport
== 0);
407 const void *audiobuf
;
410 ret
= raop_buffer_queue(raop_rtp
->buffer
, packet
, packetlen
, 1);
413 /* Decode all frames in queue */
414 while ((audiobuf
= raop_buffer_dequeue(raop_rtp
->buffer
, &audiobuflen
, no_resend
))) {
415 raop_rtp
->callbacks
.audio_process(raop_rtp
->callbacks
.cls
, cb_data
, audiobuf
, audiobuflen
);
418 /* Handle possible resend requests */
420 raop_buffer_handle_resends(raop_rtp
->buffer
, raop_rtp_resend_callback
, raop_rtp
);
425 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Exiting UDP RAOP thread");
426 raop_rtp
->callbacks
.audio_destroy(raop_rtp
->callbacks
.cls
, cb_data
);
432 raop_rtp_thread_tcp(void *arg
)
434 raop_rtp_t
*raop_rtp
= arg
;
436 unsigned char packet
[RAOP_PACKET_LEN
];
437 unsigned int packetlen
= 0;
439 const ALACSpecificConfig
*config
;
440 void *cb_data
= NULL
;
444 config
= raop_buffer_get_config(raop_rtp
->buffer
);
445 cb_data
= raop_rtp
->callbacks
.audio_init(raop_rtp
->callbacks
.cls
,
455 /* Check if we are still running and process callbacks */
456 if (raop_rtp_process_events(raop_rtp
, cb_data
)) {
460 /* Set timeout value to 5ms */
464 /* Get the correct nfds value and set rfds */
466 if (stream_fd
== -1) {
467 FD_SET(raop_rtp
->dsock
, &rfds
);
468 nfds
= raop_rtp
->dsock
+1;
470 FD_SET(stream_fd
, &rfds
);
473 ret
= select(nfds
, &rfds
, NULL
, NULL
, &tv
);
475 /* Timeout happened */
477 } else if (ret
== -1) {
478 /* FIXME: Error happened */
479 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Error in select");
482 if (stream_fd
== -1 && FD_ISSET(raop_rtp
->dsock
, &rfds
)) {
483 struct sockaddr_storage saddr
;
486 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Accepting client");
487 saddrlen
= sizeof(saddr
);
488 stream_fd
= accept(raop_rtp
->dsock
, (struct sockaddr
*)&saddr
, &saddrlen
);
489 if (stream_fd
== -1) {
490 /* FIXME: Error happened */
491 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Error in accept %d %s", errno
, strerror(errno
));
495 if (stream_fd
!= -1 && FD_ISSET(stream_fd
, &rfds
)) {
496 unsigned int rtplen
=0;
498 const void *audiobuf
;
501 ret
= recv(stream_fd
, (char *)(packet
+packetlen
), sizeof(packet
)-packetlen
, 0);
503 /* TCP socket closed */
504 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "TCP socket closed");
506 } else if (ret
== -1) {
507 /* FIXME: Error happened */
508 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Error in recv");
513 /* Check that we have enough bytes */
517 if (packet
[0] != '$' || packet
[1] != '\0') {
518 /* FIXME: Incorrect RTP magic bytes */
521 rtplen
= (packet
[2] << 8) | packet
[3];
522 if (rtplen
> sizeof(packet
)) {
523 /* FIXME: Too long packet */
524 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Error, packet too long %d", rtplen
);
527 if (packetlen
< 4+rtplen
) {
531 /* Packet is valid, process it */
532 ret
= raop_buffer_queue(raop_rtp
->buffer
, packet
+4, rtplen
, 0);
535 /* Remove processed bytes from packet buffer */
536 memmove(packet
, packet
+4+rtplen
, packetlen
-rtplen
);
537 packetlen
-= 4+rtplen
;
539 /* Decode the received frame */
540 if ((audiobuf
= raop_buffer_dequeue(raop_rtp
->buffer
, &audiobuflen
, 1))) {
541 raop_rtp
->callbacks
.audio_process(raop_rtp
->callbacks
.cls
, cb_data
, audiobuf
, audiobuflen
);
546 /* Close the stream file descriptor */
547 if (stream_fd
!= -1) {
548 closesocket(stream_fd
);
551 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Exiting TCP RAOP thread");
552 raop_rtp
->callbacks
.audio_destroy(raop_rtp
->callbacks
.cls
, cb_data
);
558 raop_rtp_start(raop_rtp_t
*raop_rtp
, int use_udp
, unsigned short control_rport
, unsigned short timing_rport
,
559 unsigned short *control_lport
, unsigned short *timing_lport
, unsigned short *data_lport
)
565 MUTEX_LOCK(raop_rtp
->run_mutex
);
566 if (raop_rtp
->running
|| !raop_rtp
->joined
) {
567 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
571 /* Initialize ports and sockets */
572 raop_rtp
->control_rport
= control_rport
;
573 raop_rtp
->timing_rport
= timing_rport
;
574 if (raop_rtp
->remote_saddr
.ss_family
== AF_INET6
) {
577 if (raop_rtp_init_sockets(raop_rtp
, use_ipv6
, use_udp
) < 0) {
578 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Initializing sockets failed");
579 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
582 if (control_lport
) *control_lport
= raop_rtp
->control_lport
;
583 if (timing_lport
) *timing_lport
= raop_rtp
->timing_lport
;
584 if (data_lport
) *data_lport
= raop_rtp
->data_lport
;
586 /* Create the thread and initialize running values */
587 raop_rtp
->running
= 1;
588 raop_rtp
->joined
= 0;
590 THREAD_CREATE(raop_rtp
->thread
, raop_rtp_thread_udp
, raop_rtp
);
592 THREAD_CREATE(raop_rtp
->thread
, raop_rtp_thread_tcp
, raop_rtp
);
594 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
598 raop_rtp_set_volume(raop_rtp_t
*raop_rtp
, float volume
)
604 } else if (volume
< -144.0f
) {
608 /* Set volume in thread instead */
609 MUTEX_LOCK(raop_rtp
->run_mutex
);
610 raop_rtp
->volume
= volume
;
611 raop_rtp
->volume_changed
= 1;
612 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
616 raop_rtp_set_metadata(raop_rtp_t
*raop_rtp
, const char *data
, int datalen
)
618 unsigned char *metadata
;
625 metadata
= malloc(datalen
);
627 memcpy(metadata
, data
, datalen
);
629 /* Set metadata in thread instead */
630 MUTEX_LOCK(raop_rtp
->run_mutex
);
631 raop_rtp
->metadata
= metadata
;
632 raop_rtp
->metadata_len
= datalen
;
633 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
637 raop_rtp_set_coverart(raop_rtp_t
*raop_rtp
, const char *data
, int datalen
)
639 unsigned char *coverart
;
646 coverart
= malloc(datalen
);
648 memcpy(coverart
, data
, datalen
);
650 /* Set coverart in thread instead */
651 MUTEX_LOCK(raop_rtp
->run_mutex
);
652 raop_rtp
->coverart
= coverart
;
653 raop_rtp
->coverart_len
= datalen
;
654 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
658 raop_rtp_flush(raop_rtp_t
*raop_rtp
, int next_seq
)
662 /* Call flush in thread instead */
663 MUTEX_LOCK(raop_rtp
->run_mutex
);
664 raop_rtp
->flush
= next_seq
;
665 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
669 raop_rtp_stop(raop_rtp_t
*raop_rtp
)
673 /* Check that we are running and thread is not
674 * joined (should never be while still running) */
675 MUTEX_LOCK(raop_rtp
->run_mutex
);
676 if (!raop_rtp
->running
|| raop_rtp
->joined
) {
677 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
680 raop_rtp
->running
= 0;
681 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
683 /* Join the thread */
684 THREAD_JOIN(raop_rtp
->thread
);
685 if (raop_rtp
->csock
!= -1) closesocket(raop_rtp
->csock
);
686 if (raop_rtp
->tsock
!= -1) closesocket(raop_rtp
->tsock
);
687 if (raop_rtp
->dsock
!= -1) closesocket(raop_rtp
->dsock
);
689 /* Flush buffer into initial state */
690 raop_buffer_flush(raop_rtp
->buffer
, -1);
692 /* Mark thread as joined */
693 MUTEX_LOCK(raop_rtp
->run_mutex
);
694 raop_rtp
->joined
= 1;
695 MUTEX_UNLOCK(raop_rtp
->run_mutex
);