2 * Copyright (C) 2011-2012 Juho Vähä-Herttua
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
23 #include "raop_buffer.h"
28 #define NO_FLUSH (-42)
32 raop_callbacks_t callbacks
;
34 raop_buffer_t
*buffer
;
36 /* These variables only edited mutex locked */
41 thread_handle_t thread
;
42 mutex_handle_t run_mutex
;
44 /* Remote control and timing ports */
45 unsigned short control_rport
;
46 unsigned short timing_rport
;
48 /* Sockets for control, timing and data */
49 int csock
, tsock
, dsock
;
51 /* Local control, timing and data ports */
52 unsigned short control_lport
;
53 unsigned short timing_lport
;
54 unsigned short data_lport
;
56 struct sockaddr_storage control_saddr
;
57 socklen_t control_saddr_len
;
58 unsigned short control_seqnum
;
62 raop_rtp_init(logger_t
*logger
, raop_callbacks_t
*callbacks
, const char *fmtp
,
63 const unsigned char *aeskey
, const unsigned char *aesiv
)
69 raop_rtp
= calloc(1, sizeof(raop_rtp_t
));
73 raop_rtp
->logger
= logger
;
74 memcpy(&raop_rtp
->callbacks
, callbacks
, sizeof(raop_callbacks_t
));
75 raop_rtp
->buffer
= raop_buffer_init(fmtp
, aeskey
, aesiv
);
76 if (!raop_rtp
->buffer
) {
81 raop_rtp
->running
= 0;
83 raop_rtp
->flush
= NO_FLUSH
;
84 MUTEX_CREATE(raop_rtp
->run_mutex
);
90 raop_rtp_destroy(raop_rtp_t
*raop_rtp
)
93 raop_rtp_stop(raop_rtp
);
95 MUTEX_DESTROY(raop_rtp
->run_mutex
);
96 raop_buffer_destroy(raop_rtp
->buffer
);
102 raop_rtp_init_sockets(raop_rtp_t
*raop_rtp
, int use_ipv6
, int use_udp
)
104 int csock
= -1, tsock
= -1, dsock
= -1;
105 unsigned short cport
= 0, tport
= 0, dport
= 0;
110 csock
= netutils_init_socket(&cport
, use_ipv6
, use_udp
);
111 tsock
= netutils_init_socket(&tport
, use_ipv6
, use_udp
);
112 if (csock
== -1 || tsock
== -1) {
113 goto sockets_cleanup
;
116 dsock
= netutils_init_socket(&dport
, use_ipv6
, use_udp
);
118 goto sockets_cleanup
;
121 /* Listen to the data socket if using TCP */
123 if (listen(dsock
, 1) < 0)
124 goto sockets_cleanup
;
127 /* Set socket descriptors */
128 raop_rtp
->csock
= csock
;
129 raop_rtp
->tsock
= tsock
;
130 raop_rtp
->dsock
= dsock
;
132 /* Set port values */
133 raop_rtp
->control_lport
= cport
;
134 raop_rtp
->timing_lport
= tport
;
135 raop_rtp
->data_lport
= dport
;
139 if (csock
!= -1) closesocket(csock
);
140 if (tsock
!= -1) closesocket(tsock
);
141 if (dsock
!= -1) closesocket(dsock
);
146 raop_rtp_resend_callback(void *opaque
, unsigned short seqnum
, unsigned short count
)
148 raop_rtp_t
*raop_rtp
= opaque
;
149 unsigned char packet
[8];
150 unsigned short ourseqnum
;
151 struct sockaddr
*addr
;
154 addr
= (struct sockaddr
*)&raop_rtp
->control_saddr
;
155 addrlen
= raop_rtp
->control_saddr_len
;
157 logger_log(raop_rtp
->logger
, LOGGER_DEBUG
, "Got resend request %d %d\n", seqnum
, count
);
158 ourseqnum
= raop_rtp
->control_seqnum
++;
160 /* Fill the request buffer */
162 packet
[1] = 0x55|0x80;
163 packet
[2] = (ourseqnum
>> 8);
164 packet
[3] = ourseqnum
;
165 packet
[4] = (seqnum
>> 8);
167 packet
[6] = (count
>> 8);
170 sendto(raop_rtp
->csock
, (const char *)packet
, sizeof(packet
), 0, addr
, addrlen
);
175 raop_rtp_thread_udp(void *arg
)
177 raop_rtp_t
*raop_rtp
= arg
;
178 unsigned char packet
[RAOP_PACKET_LEN
];
179 unsigned int packetlen
;
180 struct sockaddr_storage saddr
;
184 const ALACSpecificConfig
*config
;
185 void *cb_data
= NULL
;
189 config
= raop_buffer_get_config(raop_rtp
->buffer
);
190 raop_rtp
->callbacks
.audio_init(raop_rtp
->callbacks
.cls
, &cb_data
,
203 MUTEX_LOCK(raop_rtp
->run_mutex
);
204 if (!raop_rtp
->running
) {
205 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
208 /* Read the volume level */
209 volume_changed
= (volume
!= raop_rtp
->volume
);
210 volume
= raop_rtp
->volume
;
212 /* Read the flush value */
213 flush
= raop_rtp
->flush
;
214 raop_rtp
->flush
= NO_FLUSH
;
215 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
217 /* Call set_volume callback if changed */
218 if (volume_changed
) {
219 raop_rtp
->callbacks
.audio_set_volume(raop_rtp
->callbacks
.cls
, cb_data
, volume
);
221 if (flush
!= NO_FLUSH
) {
222 raop_buffer_flush(raop_rtp
->buffer
, flush
);
223 raop_rtp
->callbacks
.audio_flush(raop_rtp
->callbacks
.cls
, cb_data
);
226 /* Set timeout value to 5ms */
230 /* Get the correct nfds value */
231 nfds
= raop_rtp
->csock
+1;
232 if (raop_rtp
->tsock
>= nfds
)
233 nfds
= raop_rtp
->tsock
+1;
234 if (raop_rtp
->dsock
>= nfds
)
235 nfds
= raop_rtp
->dsock
+1;
237 /* Set rfds and call select */
239 FD_SET(raop_rtp
->csock
, &rfds
);
240 FD_SET(raop_rtp
->tsock
, &rfds
);
241 FD_SET(raop_rtp
->dsock
, &rfds
);
242 ret
= select(nfds
, &rfds
, NULL
, NULL
, &tv
);
244 /* Timeout happened */
246 } else if (ret
== -1) {
247 /* FIXME: Error happened */
251 if (FD_ISSET(raop_rtp
->csock
, &rfds
)) {
252 saddrlen
= sizeof(saddr
);
253 packetlen
= recvfrom(raop_rtp
->csock
, (char *)packet
, sizeof(packet
), 0,
254 (struct sockaddr
*)&saddr
, &saddrlen
);
256 /* FIXME: Get destination address here */
257 memcpy(&raop_rtp
->control_saddr
, &saddr
, saddrlen
);
258 raop_rtp
->control_saddr_len
= saddrlen
;
260 if (packetlen
>= 12) {
261 char type
= packet
[1] & ~0x80;
263 logger_log(raop_rtp
->logger
, LOGGER_DEBUG
, "Got control packet of type 0x%02x\n", type
);
265 /* Handle resent data packet */
266 int ret
= raop_buffer_queue(raop_rtp
->buffer
, packet
+4, packetlen
-4, 1);
270 } else if (FD_ISSET(raop_rtp
->tsock
, &rfds
)) {
271 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Would have timing packet in queue\n");
272 } else if (FD_ISSET(raop_rtp
->dsock
, &rfds
)) {
273 saddrlen
= sizeof(saddr
);
274 packetlen
= recvfrom(raop_rtp
->dsock
, (char *)packet
, sizeof(packet
), 0,
275 (struct sockaddr
*)&saddr
, &saddrlen
);
276 if (packetlen
>= 12) {
277 int no_resend
= (raop_rtp
->control_rport
== 0);
280 const void *audiobuf
;
283 ret
= raop_buffer_queue(raop_rtp
->buffer
, packet
, packetlen
, 1);
286 /* Decode all frames in queue */
287 while ((audiobuf
= raop_buffer_dequeue(raop_rtp
->buffer
, &audiobuflen
, no_resend
))) {
288 raop_rtp
->callbacks
.audio_process(raop_rtp
->callbacks
.cls
, cb_data
, audiobuf
, audiobuflen
);
291 /* Handle possible resend requests */
293 raop_buffer_handle_resends(raop_rtp
->buffer
, raop_rtp_resend_callback
, raop_rtp
);
298 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Exiting thread\n");
299 raop_rtp
->callbacks
.audio_destroy(raop_rtp
->callbacks
.cls
, cb_data
);
305 raop_rtp_thread_tcp(void *arg
)
307 raop_rtp_t
*raop_rtp
= arg
;
309 unsigned char packet
[RAOP_PACKET_LEN
];
310 unsigned int packetlen
= 0;
313 const ALACSpecificConfig
*config
;
314 void *cb_data
= NULL
;
318 config
= raop_buffer_get_config(raop_rtp
->buffer
);
319 raop_rtp
->callbacks
.audio_init(raop_rtp
->callbacks
.cls
, &cb_data
,
331 MUTEX_LOCK(raop_rtp
->run_mutex
);
332 if (!raop_rtp
->running
) {
333 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
336 volume_changed
= (volume
!= raop_rtp
->volume
);
337 volume
= raop_rtp
->volume
;
338 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
340 /* Call set_volume callback if changed */
341 if (volume_changed
) {
342 raop_rtp
->callbacks
.audio_set_volume(raop_rtp
->callbacks
.cls
, cb_data
, volume
);
345 /* Set timeout value to 5ms */
349 /* Get the correct nfds value and set rfds */
351 if (stream_fd
== -1) {
352 FD_SET(raop_rtp
->dsock
, &rfds
);
353 nfds
= raop_rtp
->dsock
+1;
355 FD_SET(stream_fd
, &rfds
);
358 ret
= select(nfds
, &rfds
, NULL
, NULL
, &tv
);
360 /* Timeout happened */
362 } else if (ret
== -1) {
363 /* FIXME: Error happened */
364 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Error in select\n");
367 if (stream_fd
== -1 && FD_ISSET(raop_rtp
->dsock
, &rfds
)) {
368 struct sockaddr_storage saddr
;
371 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Accepting client\n");
372 saddrlen
= sizeof(saddr
);
373 stream_fd
= accept(raop_rtp
->dsock
, (struct sockaddr
*)&saddr
, &saddrlen
);
374 if (stream_fd
== -1) {
375 /* FIXME: Error happened */
376 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Error in accept %d %s\n", errno
, strerror(errno
));
380 if (stream_fd
!= -1 && FD_ISSET(stream_fd
, &rfds
)) {
381 unsigned int rtplen
=0;
383 const void *audiobuf
;
386 ret
= recv(stream_fd
, (char *)(packet
+packetlen
), sizeof(packet
)-packetlen
, 0);
388 /* TCP socket closed */
389 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "TCP socket closed\n");
391 } else if (ret
== -1) {
392 /* FIXME: Error happened */
393 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Error in recv\n");
398 /* Check that we have enough bytes */
402 if (packet
[0] != '$' || packet
[1] != '\0') {
403 /* FIXME: Incorrect RTP magic bytes */
406 rtplen
= (packet
[2] << 8) | packet
[3];
407 if (rtplen
> sizeof(packet
)) {
408 /* FIXME: Too long packet */
409 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Error, packet too long %d\n", rtplen
);
412 if (packetlen
< 4+rtplen
) {
416 /* Packet is valid, process it */
417 ret
= raop_buffer_queue(raop_rtp
->buffer
, packet
+4, rtplen
, 0);
420 /* Remove processed bytes from packet buffer */
421 memmove(packet
, packet
+4+rtplen
, packetlen
-rtplen
);
422 packetlen
-= 4+rtplen
;
424 /* Decode the received frame */
425 if ((audiobuf
= raop_buffer_dequeue(raop_rtp
->buffer
, &audiobuflen
, 1))) {
426 raop_rtp
->callbacks
.audio_process(raop_rtp
->callbacks
.cls
, cb_data
, audiobuf
, audiobuflen
);
431 /* Close the stream file descriptor */
432 if (stream_fd
!= -1) {
433 closesocket(stream_fd
);
436 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Exiting thread\n");
437 raop_rtp
->callbacks
.audio_destroy(raop_rtp
->callbacks
.cls
, cb_data
);
443 raop_rtp_start(raop_rtp_t
*raop_rtp
, int use_udp
, unsigned short control_rport
, unsigned short timing_rport
,
444 unsigned short *control_lport
, unsigned short *timing_lport
, unsigned short *data_lport
)
448 MUTEX_LOCK(raop_rtp
->run_mutex
);
449 if (raop_rtp
->running
|| !raop_rtp
->joined
) {
450 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
454 /* Initialize ports and sockets */
455 raop_rtp
->control_rport
= control_rport
;
456 raop_rtp
->timing_rport
= timing_rport
;
457 if (raop_rtp_init_sockets(raop_rtp
, 1, use_udp
) < 0) {
458 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Initializing sockets failed\n");
459 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
462 if (control_lport
) *control_lport
= raop_rtp
->control_lport
;
463 if (timing_lport
) *timing_lport
= raop_rtp
->timing_lport
;
464 if (data_lport
) *data_lport
= raop_rtp
->data_lport
;
466 /* Create the thread and initialize running values */
467 raop_rtp
->running
= 1;
468 raop_rtp
->joined
= 0;
470 THREAD_CREATE(raop_rtp
->thread
, raop_rtp_thread_udp
, raop_rtp
);
472 THREAD_CREATE(raop_rtp
->thread
, raop_rtp_thread_tcp
, raop_rtp
);
474 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
478 raop_rtp_set_volume(raop_rtp_t
*raop_rtp
, float volume
)
484 } else if (volume
< -144.0f
) {
488 /* Set volume in thread instead */
489 MUTEX_LOCK(raop_rtp
->run_mutex
);
490 raop_rtp
->volume
= volume
;
491 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
495 raop_rtp_flush(raop_rtp_t
*raop_rtp
, int next_seq
)
499 /* Call flush in thread instead */
500 MUTEX_LOCK(raop_rtp
->run_mutex
);
501 raop_rtp
->flush
= next_seq
;
502 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
506 raop_rtp_stop(raop_rtp_t
*raop_rtp
)
510 /* Check that we are running and thread is not
511 * joined (should never be while still running) */
512 MUTEX_LOCK(raop_rtp
->run_mutex
);
513 if (!raop_rtp
->running
|| raop_rtp
->joined
) {
514 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
517 raop_rtp
->running
= 0;
518 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
520 /* Join the thread */
521 THREAD_JOIN(raop_rtp
->thread
);
522 if (raop_rtp
->csock
!= -1) closesocket(raop_rtp
->csock
);
523 if (raop_rtp
->tsock
!= -1) closesocket(raop_rtp
->tsock
);
524 if (raop_rtp
->dsock
!= -1) closesocket(raop_rtp
->dsock
);
526 /* Flush buffer into initial state */
527 raop_buffer_flush(raop_rtp
->buffer
, -1);
529 /* Mark thread as joined */
530 MUTEX_LOCK(raop_rtp
->run_mutex
);
531 raop_rtp
->joined
= 1;
532 MUTEX_UNLOCK(raop_rtp
->run_mutex
);