9 #include "raop_buffer.h"
14 #define NO_FLUSH (-42)
18 raop_callbacks_t callbacks
;
20 raop_buffer_t
*buffer
;
22 /* These variables only edited mutex locked */
27 thread_handle_t thread
;
28 mutex_handle_t run_mutex
;
30 /* Remote control and timing ports */
31 unsigned short control_rport
;
32 unsigned short timing_rport
;
34 /* Sockets for control, timing and data */
35 int csock
, tsock
, dsock
;
37 /* Local control, timing and data ports */
38 unsigned short control_lport
;
39 unsigned short timing_lport
;
40 unsigned short data_lport
;
42 struct sockaddr_storage control_saddr
;
43 socklen_t control_saddr_len
;
44 unsigned short control_seqnum
;
48 raop_rtp_init(logger_t
*logger
, raop_callbacks_t
*callbacks
, const char *fmtp
,
49 const unsigned char *aeskey
, const unsigned char *aesiv
)
55 raop_rtp
= calloc(1, sizeof(raop_rtp_t
));
59 raop_rtp
->logger
= logger
;
60 memcpy(&raop_rtp
->callbacks
, callbacks
, sizeof(raop_callbacks_t
));
61 raop_rtp
->buffer
= raop_buffer_init(fmtp
, aeskey
, aesiv
);
62 if (!raop_rtp
->buffer
) {
67 raop_rtp
->running
= 0;
69 raop_rtp
->flush
= NO_FLUSH
;
70 MUTEX_CREATE(raop_rtp
->run_mutex
);
76 raop_rtp_destroy(raop_rtp_t
*raop_rtp
)
79 raop_rtp_stop(raop_rtp
);
81 MUTEX_DESTROY(raop_rtp
->run_mutex
);
82 raop_buffer_destroy(raop_rtp
->buffer
);
88 raop_rtp_init_sockets(raop_rtp_t
*raop_rtp
, int use_ipv6
, int use_udp
)
90 int csock
= -1, tsock
= -1, dsock
= -1;
91 unsigned short cport
= 0, tport
= 0, dport
= 0;
96 csock
= netutils_init_socket(&cport
, use_ipv6
, use_udp
);
97 tsock
= netutils_init_socket(&tport
, use_ipv6
, use_udp
);
98 if (csock
== -1 || tsock
== -1) {
102 dsock
= netutils_init_socket(&dport
, use_ipv6
, use_udp
);
104 goto sockets_cleanup
;
107 /* Listen to the data socket if using TCP */
109 if (listen(dsock
, 1) < 0)
110 goto sockets_cleanup
;
113 /* Set socket descriptors */
114 raop_rtp
->csock
= csock
;
115 raop_rtp
->tsock
= tsock
;
116 raop_rtp
->dsock
= dsock
;
118 /* Set port values */
119 raop_rtp
->control_lport
= cport
;
120 raop_rtp
->timing_lport
= tport
;
121 raop_rtp
->data_lport
= dport
;
125 if (csock
!= -1) closesocket(csock
);
126 if (tsock
!= -1) closesocket(tsock
);
127 if (dsock
!= -1) closesocket(dsock
);
132 raop_rtp_resend_callback(void *opaque
, unsigned short seqnum
, unsigned short count
)
134 raop_rtp_t
*raop_rtp
= opaque
;
135 unsigned char packet
[8];
136 unsigned short ourseqnum
;
137 struct sockaddr
*addr
;
140 addr
= (struct sockaddr
*)&raop_rtp
->control_saddr
;
141 addrlen
= raop_rtp
->control_saddr_len
;
143 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Got resend request %d %d\n", seqnum
, count
);
144 ourseqnum
= raop_rtp
->control_seqnum
++;
146 /* Fill the request buffer */
148 packet
[1] = 0x55|0x80;
149 packet
[2] = (ourseqnum
>> 8);
150 packet
[3] = ourseqnum
;
151 packet
[4] = (seqnum
>> 8);
153 packet
[6] = (count
>> 8);
156 sendto(raop_rtp
->csock
, (const char *)packet
, sizeof(packet
), 0, addr
, addrlen
);
161 raop_rtp_thread_udp(void *arg
)
163 raop_rtp_t
*raop_rtp
= arg
;
164 unsigned char packet
[RAOP_PACKET_LEN
];
165 unsigned int packetlen
;
166 struct sockaddr_storage saddr
;
169 const ALACSpecificConfig
*config
;
170 void *cb_data
= NULL
;
174 config
= raop_buffer_get_config(raop_rtp
->buffer
);
175 raop_rtp
->callbacks
.audio_init(raop_rtp
->callbacks
.cls
, &cb_data
,
189 MUTEX_LOCK(raop_rtp
->run_mutex
);
190 if (!raop_rtp
->running
) {
191 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
194 /* Read the volume level */
195 volume_changed
= (volume
!= raop_rtp
->volume
);
196 volume
= raop_rtp
->volume
;
198 /* Read the flush value */
199 flush
= raop_rtp
->flush
;
200 raop_rtp
->flush
= NO_FLUSH
;
201 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
203 /* Call set_volume callback if changed */
204 if (volume_changed
) {
205 raop_rtp
->callbacks
.audio_set_volume(raop_rtp
->callbacks
.cls
, cb_data
, volume
);
207 if (flush
!= NO_FLUSH
) {
208 raop_buffer_flush(raop_rtp
->buffer
, flush
);
209 raop_rtp
->callbacks
.audio_flush(raop_rtp
->callbacks
.cls
, cb_data
);
212 /* Set timeout value to 5ms */
216 /* Get the correct nfds value */
217 nfds
= raop_rtp
->csock
+1;
218 if (raop_rtp
->tsock
>= nfds
)
219 nfds
= raop_rtp
->tsock
+1;
220 if (raop_rtp
->dsock
>= nfds
)
221 nfds
= raop_rtp
->dsock
+1;
223 /* Set rfds and call select */
225 FD_SET(raop_rtp
->csock
, &rfds
);
226 FD_SET(raop_rtp
->tsock
, &rfds
);
227 FD_SET(raop_rtp
->dsock
, &rfds
);
228 ret
= select(nfds
, &rfds
, NULL
, NULL
, &tv
);
230 /* Timeout happened */
232 } else if (ret
== -1) {
233 /* FIXME: Error happened */
237 if (FD_ISSET(raop_rtp
->csock
, &rfds
)) {
238 saddrlen
= sizeof(saddr
);
239 packetlen
= recvfrom(raop_rtp
->csock
, (char *)packet
, sizeof(packet
), 0,
240 (struct sockaddr
*)&saddr
, &saddrlen
);
242 /* FIXME: Get destination address here */
243 memcpy(&raop_rtp
->control_saddr
, &saddr
, saddrlen
);
244 raop_rtp
->control_saddr_len
= saddrlen
;
246 if (packetlen
>= 12) {
247 char type
= packet
[1] & ~0x80;
249 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Got control packet of type 0x%02x\n", type
);
251 /* Handle resent data packet */
252 int ret
= raop_buffer_queue(raop_rtp
->buffer
, packet
+4, packetlen
-4, 1);
256 } else if (FD_ISSET(raop_rtp
->tsock
, &rfds
)) {
257 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Would have timing packet in queue\n");
258 } else if (FD_ISSET(raop_rtp
->dsock
, &rfds
)) {
259 saddrlen
= sizeof(saddr
);
260 packetlen
= recvfrom(raop_rtp
->dsock
, (char *)packet
, sizeof(packet
), 0,
261 (struct sockaddr
*)&saddr
, &saddrlen
);
262 if (packetlen
>= 12) {
263 int no_resend
= (raop_rtp
->control_rport
== 0);
266 const void *audiobuf
;
269 ret
= raop_buffer_queue(raop_rtp
->buffer
, packet
, packetlen
, 1);
272 /* Decode all frames in queue */
273 while ((audiobuf
= raop_buffer_dequeue(raop_rtp
->buffer
, &audiobuflen
, no_resend
))) {
274 raop_rtp
->callbacks
.audio_process(raop_rtp
->callbacks
.cls
, cb_data
, audiobuf
, audiobuflen
);
277 /* Handle possible resend requests */
279 raop_buffer_handle_resends(raop_rtp
->buffer
, raop_rtp_resend_callback
, raop_rtp
);
284 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Exiting thread\n");
285 raop_rtp
->callbacks
.audio_destroy(raop_rtp
->callbacks
.cls
, cb_data
);
291 raop_rtp_thread_tcp(void *arg
)
293 raop_rtp_t
*raop_rtp
= arg
;
295 unsigned char packet
[RAOP_PACKET_LEN
];
296 unsigned int packetlen
= 0;
298 const ALACSpecificConfig
*config
;
299 void *cb_data
= NULL
;
303 config
= raop_buffer_get_config(raop_rtp
->buffer
);
304 raop_rtp
->callbacks
.audio_init(raop_rtp
->callbacks
.cls
, &cb_data
,
317 MUTEX_LOCK(raop_rtp
->run_mutex
);
318 if (!raop_rtp
->running
) {
319 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
322 volume_changed
= (volume
!= raop_rtp
->volume
);
323 volume
= raop_rtp
->volume
;
324 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
326 /* Call set_volume callback if changed */
327 if (volume_changed
) {
328 raop_rtp
->callbacks
.audio_set_volume(raop_rtp
->callbacks
.cls
, cb_data
, volume
);
331 /* Set timeout value to 5ms */
335 /* Get the correct nfds value and set rfds */
337 if (stream_fd
== -1) {
338 FD_SET(raop_rtp
->dsock
, &rfds
);
339 nfds
= raop_rtp
->dsock
+1;
341 FD_SET(stream_fd
, &rfds
);
344 ret
= select(nfds
, &rfds
, NULL
, NULL
, &tv
);
346 /* Timeout happened */
348 } else if (ret
== -1) {
349 /* FIXME: Error happened */
350 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Error in select\n");
353 if (stream_fd
== -1 && FD_ISSET(raop_rtp
->dsock
, &rfds
)) {
354 struct sockaddr_storage saddr
;
357 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Accepting client\n");
358 saddrlen
= sizeof(saddr
);
359 stream_fd
= accept(raop_rtp
->dsock
, (struct sockaddr
*)&saddr
, &saddrlen
);
360 if (stream_fd
== -1) {
361 /* FIXME: Error happened */
362 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Error in accept %d %s\n", errno
, strerror(errno
));
366 if (stream_fd
!= -1 && FD_ISSET(stream_fd
, &rfds
)) {
367 unsigned int rtplen
=0;
370 const void *audiobuf
;
373 ret
= recv(stream_fd
, (char *)(packet
+packetlen
), sizeof(packet
)-packetlen
, 0);
375 /* TCP socket closed */
376 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "TCP socket closed\n");
378 } else if (ret
== -1) {
379 /* FIXME: Error happened */
380 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Error in recv\n");
385 /* Check that we have enough bytes */
389 if (packet
[0] != '$' || packet
[1] != '\0') {
390 /* FIXME: Incorrect RTP magic bytes */
393 rtplen
= (packet
[2] << 8) | packet
[3];
394 if (rtplen
> sizeof(packet
)) {
395 /* FIXME: Too long packet */
396 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Error, packet too long %d\n", rtplen
);
399 if (packetlen
< 4+rtplen
) {
403 /* Packet is valid, process it */
404 type
= packet
[4+1] & ~0x80;
405 ret
= raop_buffer_queue(raop_rtp
->buffer
, packet
+4, rtplen
, 0);
408 /* Remove processed bytes from packet buffer */
409 memmove(packet
, packet
+4+rtplen
, packetlen
-rtplen
);
410 packetlen
-= 4+rtplen
;
412 /* Decode the received frame */
413 if ((audiobuf
= raop_buffer_dequeue(raop_rtp
->buffer
, &audiobuflen
, 1))) {
414 raop_rtp
->callbacks
.audio_process(raop_rtp
->callbacks
.cls
, cb_data
, audiobuf
, audiobuflen
);
419 /* Close the stream file descriptor */
420 if (stream_fd
!= -1) {
421 closesocket(stream_fd
);
424 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Exiting thread\n");
425 raop_rtp
->callbacks
.audio_destroy(raop_rtp
->callbacks
.cls
, cb_data
);
431 raop_rtp_start(raop_rtp_t
*raop_rtp
, int use_udp
, unsigned short control_rport
, unsigned short timing_rport
,
432 unsigned short *control_lport
, unsigned short *timing_lport
, unsigned short *data_lport
)
436 MUTEX_LOCK(raop_rtp
->run_mutex
);
437 if (raop_rtp
->running
|| !raop_rtp
->joined
) {
438 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
442 /* Initialize ports and sockets */
443 raop_rtp
->control_rport
= control_rport
;
444 raop_rtp
->timing_rport
= timing_rport
;
445 if (raop_rtp_init_sockets(raop_rtp
, 1, use_udp
) < 0) {
446 logger_log(raop_rtp
->logger
, LOGGER_INFO
, "Initializing sockets failed\n");
447 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
450 if (control_lport
) *control_lport
= raop_rtp
->control_lport
;
451 if (timing_lport
) *timing_lport
= raop_rtp
->timing_lport
;
452 if (data_lport
) *data_lport
= raop_rtp
->data_lport
;
454 /* Create the thread and initialize running values */
455 raop_rtp
->running
= 1;
456 raop_rtp
->joined
= 0;
458 THREAD_CREATE(raop_rtp
->thread
, raop_rtp_thread_udp
, raop_rtp
);
460 THREAD_CREATE(raop_rtp
->thread
, raop_rtp_thread_tcp
, raop_rtp
);
462 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
466 raop_rtp_set_volume(raop_rtp_t
*raop_rtp
, float volume
)
472 } else if (volume
< -144.0f
) {
476 /* Set volume in thread instead */
477 MUTEX_LOCK(raop_rtp
->run_mutex
);
478 raop_rtp
->volume
= volume
;
479 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
483 raop_rtp_flush(raop_rtp_t
*raop_rtp
, int next_seq
)
487 /* Call flush in thread instead */
488 MUTEX_LOCK(raop_rtp
->run_mutex
);
489 raop_rtp
->flush
= next_seq
;
490 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
494 raop_rtp_stop(raop_rtp_t
*raop_rtp
)
498 /* Check that we are running and thread is not
499 * joined (should never be while still running) */
500 MUTEX_LOCK(raop_rtp
->run_mutex
);
501 if (!raop_rtp
->running
|| raop_rtp
->joined
) {
502 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
505 raop_rtp
->running
= 0;
506 MUTEX_UNLOCK(raop_rtp
->run_mutex
);
508 /* Join the thread */
509 THREAD_JOIN(raop_rtp
->thread
);
510 if (raop_rtp
->csock
!= -1) closesocket(raop_rtp
->csock
);
511 if (raop_rtp
->tsock
!= -1) closesocket(raop_rtp
->tsock
);
512 if (raop_rtp
->dsock
!= -1) closesocket(raop_rtp
->dsock
);
514 /* Flush buffer into initial state */
515 raop_buffer_flush(raop_rtp
->buffer
, -1);
517 /* Mark thread as joined */
518 MUTEX_LOCK(raop_rtp
->run_mutex
);
519 raop_rtp
->joined
= 1;
520 MUTEX_UNLOCK(raop_rtp
->run_mutex
);