2 * Copyright (C) 2011-2012 Juho Vähä-Herttua
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
23 #include "raop_buffer.h"
29 #define NO_FLUSH (-42)
33 raop_callbacks_t callbacks
;
35 /* Buffer to handle all resends */
36 raop_buffer_t
*buffer
;
38 /* Remote address as sockaddr */
39 struct sockaddr_storage remote_saddr
;
40 socklen_t remote_saddr_len
;
42 /* These variables only edited mutex locked */
47 thread_handle_t thread
;
48 mutex_handle_t run_mutex
;
50 /* Remote control and timing ports */
51 unsigned short control_rport
;
52 unsigned short timing_rport
;
54 /* Sockets for control, timing and data */
55 int csock
, tsock
, dsock
;
57 /* Local control, timing and data ports */
58 unsigned short control_lport
;
59 unsigned short timing_lport
;
60 unsigned short data_lport
;
62 /* Initialized after the first control packet */
63 struct sockaddr_storage control_saddr
;
64 socklen_t control_saddr_len
;
65 unsigned short control_seqnum
;
69 raop_rtp_parse_remote(raop_rtp_t
*raop_rtp
, const char *remote
)
79 current
= original
= strdup(remote
);
83 tmpstr
= utils_strsep(¤t
, " ");
84 if (strcmp(tmpstr
, "IN")) {
88 tmpstr
= utils_strsep(¤t
, " ");
89 if (!strcmp(tmpstr
, "IP4") && current
) {
91 } else if (!strcmp(tmpstr
, "IP6") && current
) {
97 if (strstr(current
, ":")) {
98 /* FIXME: iTunes sends IP4 even with an IPv6 address, does it mean something */
101 ret
= netutils_parse_address(family
, current
,
102 &raop_rtp
->remote_saddr
,
103 sizeof(raop_rtp
->remote_saddr
));
108 raop_rtp
->remote_saddr_len
= ret
;
114 raop_rtp_init(logger_t
*logger
, raop_callbacks_t
*callbacks
, const char *remote
,
115 const char *fmtp
, const unsigned char *aeskey
, const unsigned char *aesiv
)
117 raop_rtp_t
*raop_rtp
;
124 raop_rtp
= calloc(1, sizeof(raop_rtp_t
));
128 raop_rtp
->logger
= logger
;
129 memcpy(&raop_rtp
->callbacks
, callbacks
, sizeof(raop_callbacks_t
));
130 raop_rtp
->buffer
= raop_buffer_init(fmtp
, aeskey
, aesiv
);
131 if (!raop_rtp
->buffer
) {
135 if (raop_rtp_parse_remote(raop_rtp
, remote
) < 0) {
140 raop_rtp
->running
= 0;
141 raop_rtp
->joined
= 1;
142 raop_rtp
->flush
= NO_FLUSH
;
143 MUTEX_CREATE(raop_rtp
->run_mutex
);
149 raop_rtp_destroy(raop_rtp_t
*raop_rtp
)
152 raop_rtp_stop(raop_rtp
);
154 MUTEX_DESTROY(raop_rtp
->run_mutex
);
155 raop_buffer_destroy(raop_rtp
->buffer
);
161 raop_rtp_init_sockets(raop_rtp_t
*raop_rtp
, int use_ipv6
, int use_udp
)
163 int csock
= -1, tsock
= -1, dsock
= -1;
164 unsigned short cport
= 0, tport
= 0, dport
= 0;
169 csock
= netutils_init_socket(&cport
, use_ipv6
, use_udp
);
170 tsock
= netutils_init_socket(&tport
, use_ipv6
, use_udp
);
171 if (csock
== -1 || tsock
== -1) {
172 goto sockets_cleanup
;
175 dsock
= netutils_init_socket(&dport
, use_ipv6
, use_udp
);
177 goto sockets_cleanup
;
180 /* Listen to the data socket if using TCP */
182 if (listen(dsock
, 1) < 0)
183 goto sockets_cleanup
;
186 /* Set socket descriptors */
187 raop_rtp
->csock
= csock
;
188 raop_rtp
->tsock
= tsock
;
189 raop_rtp
->dsock
= dsock
;
191 /* Set port values */
192 raop_rtp
->control_lport
= cport
;
193 raop_rtp
->timing_lport
= tport
;
194 raop_rtp
->data_lport
= dport
;
198 if (csock
!= -1) closesocket(csock
);
199 if (tsock
!= -1) closesocket(tsock
);
200 if (dsock
!= -1) closesocket(dsock
);
205 raop_rtp_resend_callback(void *opaque
, unsigned short seqnum
, unsigned short count
)
207 raop_rtp_t
*raop_rtp
= opaque
;
208 unsigned char packet
[8];
209 unsigned short ourseqnum
;
210 struct sockaddr
*addr
;
214 addr
= (struct sockaddr
*)&raop_rtp
->control_saddr
;
215 addrlen
= raop_rtp
->control_saddr_len
;
217 logger_log(raop_rtp
->logger
, LOGGER_DEBUG
, "Got resend request %d %d", seqnum
, count
);
218 ourseqnum
= raop_rtp
->control_seqnum
++;
220 /* Fill the request buffer */
222 packet
[1] = 0x55|0x80;
223 packet
[2] = (ourseqnum
>> 8);
224 packet
[3] = ourseqnum
;
225 packet
[4] = (seqnum
>> 8);
227 packet
[6] = (count
>> 8);
230 ret
= sendto(raop_rtp
->csock
, (const char *)packet
, sizeof(packet
), 0, addr
, addrlen
);
232 logger_log(raop_rtp
->logger
, LOGGER_WARNING
, "Resend failed: %d", SOCKET_GET_ERROR());
239 raop_rtp_thread_udp(void *arg
)
241 raop_rtp_t
*raop_rtp
= arg
;
242 unsigned char packet
[RAOP_PACKET_LEN
];
243 unsigned int packetlen
;
244 struct sockaddr_storage saddr
;
248 const ALACSpecificConfig
*config
;
249 void *cb_data
= NULL
;
253 config
= raop_buffer_get_config(raop_rtp
->buffer
);
254 cb_data
= raop_rtp
->callbacks
.audio_init(raop_rtp
->callbacks
.cls
,
267 MUTEX_LOCK(raop_rtp
->run_mutex
);
268 if (!raop_rtp
->running
) {
269 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
272 /* Read the volume level */
273 volume_changed
= (volume
!= raop_rtp
->volume
);
274 volume
= raop_rtp
->volume
;
276 /* Read the flush value */
277 flush
= raop_rtp
->flush
;
278 raop_rtp
->flush
= NO_FLUSH
;
279 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
281 /* Call set_volume callback if changed */
282 if (volume_changed
) {
283 raop_rtp
->callbacks
.audio_set_volume(raop_rtp
->callbacks
.cls
, cb_data
, volume
);
285 if (flush
!= NO_FLUSH
) {
286 raop_buffer_flush(raop_rtp
->buffer
, flush
);
287 raop_rtp
->callbacks
.audio_flush(raop_rtp
->callbacks
.cls
, cb_data
);
290 /* Set timeout value to 5ms */
294 /* Get the correct nfds value */
295 nfds
= raop_rtp
->csock
+1;
296 if (raop_rtp
->tsock
>= nfds
)
297 nfds
= raop_rtp
->tsock
+1;
298 if (raop_rtp
->dsock
>= nfds
)
299 nfds
= raop_rtp
->dsock
+1;
301 /* Set rfds and call select */
303 FD_SET(raop_rtp
->csock
, &rfds
);
304 FD_SET(raop_rtp
->tsock
, &rfds
);
305 FD_SET(raop_rtp
->dsock
, &rfds
);
306 ret
= select(nfds
, &rfds
, NULL
, NULL
, &tv
);
308 /* Timeout happened */
310 } else if (ret
== -1) {
311 /* FIXME: Error happened */
315 if (FD_ISSET(raop_rtp
->csock
, &rfds
)) {
316 saddrlen
= sizeof(saddr
);
317 packetlen
= recvfrom(raop_rtp
->csock
, (char *)packet
, sizeof(packet
), 0,
318 (struct sockaddr
*)&saddr
, &saddrlen
);
320 /* Get the destination address here, because we need the sin6_scope_id */
321 memcpy(&raop_rtp
->control_saddr
, &saddr
, saddrlen
);
322 raop_rtp
->control_saddr_len
= saddrlen
;
324 if (packetlen
>= 12) {
325 char type
= packet
[1] & ~0x80;
327 logger_log(raop_rtp
->logger
, LOGGER_DEBUG
, "Got control packet of type 0x%02x", type
);
329 /* Handle resent data packet */
330 int ret
= raop_buffer_queue(raop_rtp
->buffer
, packet
+4, packetlen
-4, 1);
334 } else if (FD_ISSET(raop_rtp
->tsock
, &rfds
)) {
335 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Would have timing packet in queue");
336 } else if (FD_ISSET(raop_rtp
->dsock
, &rfds
)) {
337 saddrlen
= sizeof(saddr
);
338 packetlen
= recvfrom(raop_rtp
->dsock
, (char *)packet
, sizeof(packet
), 0,
339 (struct sockaddr
*)&saddr
, &saddrlen
);
340 if (packetlen
>= 12) {
341 int no_resend
= (raop_rtp
->control_rport
== 0);
344 const void *audiobuf
;
347 ret
= raop_buffer_queue(raop_rtp
->buffer
, packet
, packetlen
, 1);
350 /* Decode all frames in queue */
351 while ((audiobuf
= raop_buffer_dequeue(raop_rtp
->buffer
, &audiobuflen
, no_resend
))) {
352 raop_rtp
->callbacks
.audio_process(raop_rtp
->callbacks
.cls
, cb_data
, audiobuf
, audiobuflen
);
355 /* Handle possible resend requests */
357 raop_buffer_handle_resends(raop_rtp
->buffer
, raop_rtp_resend_callback
, raop_rtp
);
362 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Exiting UDP RAOP thread");
363 raop_rtp
->callbacks
.audio_destroy(raop_rtp
->callbacks
.cls
, cb_data
);
369 raop_rtp_thread_tcp(void *arg
)
371 raop_rtp_t
*raop_rtp
= arg
;
373 unsigned char packet
[RAOP_PACKET_LEN
];
374 unsigned int packetlen
= 0;
377 const ALACSpecificConfig
*config
;
378 void *cb_data
= NULL
;
382 config
= raop_buffer_get_config(raop_rtp
->buffer
);
383 cb_data
= raop_rtp
->callbacks
.audio_init(raop_rtp
->callbacks
.cls
,
395 MUTEX_LOCK(raop_rtp
->run_mutex
);
396 if (!raop_rtp
->running
) {
397 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
400 volume_changed
= (volume
!= raop_rtp
->volume
);
401 volume
= raop_rtp
->volume
;
402 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
404 /* Call set_volume callback if changed */
405 if (volume_changed
) {
406 raop_rtp
->callbacks
.audio_set_volume(raop_rtp
->callbacks
.cls
, cb_data
, volume
);
409 /* Set timeout value to 5ms */
413 /* Get the correct nfds value and set rfds */
415 if (stream_fd
== -1) {
416 FD_SET(raop_rtp
->dsock
, &rfds
);
417 nfds
= raop_rtp
->dsock
+1;
419 FD_SET(stream_fd
, &rfds
);
422 ret
= select(nfds
, &rfds
, NULL
, NULL
, &tv
);
424 /* Timeout happened */
426 } else if (ret
== -1) {
427 /* FIXME: Error happened */
428 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Error in select");
431 if (stream_fd
== -1 && FD_ISSET(raop_rtp
->dsock
, &rfds
)) {
432 struct sockaddr_storage saddr
;
435 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Accepting client");
436 saddrlen
= sizeof(saddr
);
437 stream_fd
= accept(raop_rtp
->dsock
, (struct sockaddr
*)&saddr
, &saddrlen
);
438 if (stream_fd
== -1) {
439 /* FIXME: Error happened */
440 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Error in accept %d %s", errno
, strerror(errno
));
444 if (stream_fd
!= -1 && FD_ISSET(stream_fd
, &rfds
)) {
445 unsigned int rtplen
=0;
447 const void *audiobuf
;
450 ret
= recv(stream_fd
, (char *)(packet
+packetlen
), sizeof(packet
)-packetlen
, 0);
452 /* TCP socket closed */
453 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "TCP socket closed");
455 } else if (ret
== -1) {
456 /* FIXME: Error happened */
457 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Error in recv");
462 /* Check that we have enough bytes */
466 if (packet
[0] != '$' || packet
[1] != '\0') {
467 /* FIXME: Incorrect RTP magic bytes */
470 rtplen
= (packet
[2] << 8) | packet
[3];
471 if (rtplen
> sizeof(packet
)) {
472 /* FIXME: Too long packet */
473 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Error, packet too long %d", rtplen
);
476 if (packetlen
< 4+rtplen
) {
480 /* Packet is valid, process it */
481 ret
= raop_buffer_queue(raop_rtp
->buffer
, packet
+4, rtplen
, 0);
484 /* Remove processed bytes from packet buffer */
485 memmove(packet
, packet
+4+rtplen
, packetlen
-rtplen
);
486 packetlen
-= 4+rtplen
;
488 /* Decode the received frame */
489 if ((audiobuf
= raop_buffer_dequeue(raop_rtp
->buffer
, &audiobuflen
, 1))) {
490 raop_rtp
->callbacks
.audio_process(raop_rtp
->callbacks
.cls
, cb_data
, audiobuf
, audiobuflen
);
495 /* Close the stream file descriptor */
496 if (stream_fd
!= -1) {
497 closesocket(stream_fd
);
500 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Exiting TCP RAOP thread");
501 raop_rtp
->callbacks
.audio_destroy(raop_rtp
->callbacks
.cls
, cb_data
);
507 raop_rtp_start(raop_rtp_t
*raop_rtp
, int use_udp
, unsigned short control_rport
, unsigned short timing_rport
,
508 unsigned short *control_lport
, unsigned short *timing_lport
, unsigned short *data_lport
)
514 MUTEX_LOCK(raop_rtp
->run_mutex
);
515 if (raop_rtp
->running
|| !raop_rtp
->joined
) {
516 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
520 /* Initialize ports and sockets */
521 raop_rtp
->control_rport
= control_rport
;
522 raop_rtp
->timing_rport
= timing_rport
;
523 if (raop_rtp
->remote_saddr
.ss_family
== AF_INET6
) {
526 if (raop_rtp_init_sockets(raop_rtp
, use_ipv6
, use_udp
) < 0) {
527 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Initializing sockets failed");
528 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
531 if (control_lport
) *control_lport
= raop_rtp
->control_lport
;
532 if (timing_lport
) *timing_lport
= raop_rtp
->timing_lport
;
533 if (data_lport
) *data_lport
= raop_rtp
->data_lport
;
535 /* Create the thread and initialize running values */
536 raop_rtp
->running
= 1;
537 raop_rtp
->joined
= 0;
539 THREAD_CREATE(raop_rtp
->thread
, raop_rtp_thread_udp
, raop_rtp
);
541 THREAD_CREATE(raop_rtp
->thread
, raop_rtp_thread_tcp
, raop_rtp
);
543 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
547 raop_rtp_set_volume(raop_rtp_t
*raop_rtp
, float volume
)
553 } else if (volume
< -144.0f
) {
557 /* Set volume in thread instead */
558 MUTEX_LOCK(raop_rtp
->run_mutex
);
559 raop_rtp
->volume
= volume
;
560 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
564 raop_rtp_flush(raop_rtp_t
*raop_rtp
, int next_seq
)
568 /* Call flush in thread instead */
569 MUTEX_LOCK(raop_rtp
->run_mutex
);
570 raop_rtp
->flush
= next_seq
;
571 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
575 raop_rtp_stop(raop_rtp_t
*raop_rtp
)
579 /* Check that we are running and thread is not
580 * joined (should never be while still running) */
581 MUTEX_LOCK(raop_rtp
->run_mutex
);
582 if (!raop_rtp
->running
|| raop_rtp
->joined
) {
583 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
586 raop_rtp
->running
= 0;
587 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
589 /* Join the thread */
590 THREAD_JOIN(raop_rtp
->thread
);
591 if (raop_rtp
->csock
!= -1) closesocket(raop_rtp
->csock
);
592 if (raop_rtp
->tsock
!= -1) closesocket(raop_rtp
->tsock
);
593 if (raop_rtp
->dsock
!= -1) closesocket(raop_rtp
->dsock
);
595 /* Flush buffer into initial state */
596 raop_buffer_flush(raop_rtp
->buffer
, -1);
598 /* Mark thread as joined */
599 MUTEX_LOCK(raop_rtp
->run_mutex
);
600 raop_rtp
->joined
= 1;
601 MUTEX_UNLOCK(raop_rtp
->run_mutex
);