3400c0c9169e65679b0ec573cb6899bf5573b2eb
[deb_shairplay.git] / src / lib / raop_rtp.c
1 /**
2 * Copyright (C) 2011-2012 Juho Vähä-Herttua
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 */
14
15 #include <stdlib.h>
16 #include <stdio.h>
17 #include <string.h>
18 #include <assert.h>
19 #include <errno.h>
20
21 #include "raop_rtp.h"
22 #include "raop.h"
23 #include "raop_buffer.h"
24 #include "netutils.h"
25 #include "utils.h"
26 #include "compat.h"
27 #include "logger.h"
28
29 #define NO_FLUSH (-42)
30
31 struct raop_rtp_s {
32 logger_t *logger;
33 raop_callbacks_t callbacks;
34
35 /* Buffer to handle all resends */
36 raop_buffer_t *buffer;
37
38 /* Remote address as sockaddr */
39 struct sockaddr_storage remote_saddr;
40 socklen_t remote_saddr_len;
41
42 /* These variables only edited mutex locked */
43 int running;
44 int joined;
45 float volume;
46 int flush;
47 thread_handle_t thread;
48 mutex_handle_t run_mutex;
49
50 /* Remote control and timing ports */
51 unsigned short control_rport;
52 unsigned short timing_rport;
53
54 /* Sockets for control, timing and data */
55 int csock, tsock, dsock;
56
57 /* Local control, timing and data ports */
58 unsigned short control_lport;
59 unsigned short timing_lport;
60 unsigned short data_lport;
61
62 /* Initialized after the first control packet */
63 struct sockaddr_storage control_saddr;
64 socklen_t control_saddr_len;
65 unsigned short control_seqnum;
66 };
67
68 static int
69 raop_rtp_parse_remote(raop_rtp_t *raop_rtp, const char *remote)
70 {
71 char *original;
72 char *current;
73 char *tmpstr;
74 int family;
75 int ret;
76
77 assert(raop_rtp);
78
79 current = original = strdup(remote);
80 if (!original) {
81 return -1;
82 }
83 tmpstr = utils_strsep(&current, " ");
84 if (strcmp(tmpstr, "IN")) {
85 free(original);
86 return -1;
87 }
88 tmpstr = utils_strsep(&current, " ");
89 if (!strcmp(tmpstr, "IP4") && current) {
90 family = AF_INET;
91 } else if (!strcmp(tmpstr, "IP6") && current) {
92 family = AF_INET6;
93 } else {
94 free(original);
95 return -1;
96 }
97 ret = netutils_parse_address(family, current,
98 &raop_rtp->remote_saddr,
99 sizeof(raop_rtp->remote_saddr));
100 if (ret < 0) {
101 free(original);
102 return -1;
103 }
104 raop_rtp->remote_saddr_len = ret;
105 free(original);
106 return 0;
107 }
108
109 raop_rtp_t *
110 raop_rtp_init(logger_t *logger, raop_callbacks_t *callbacks, const char *remote,
111 const char *fmtp, const unsigned char *aeskey, const unsigned char *aesiv)
112 {
113 raop_rtp_t *raop_rtp;
114
115 assert(logger);
116 assert(callbacks);
117 assert(remote);
118 assert(fmtp);
119
120 raop_rtp = calloc(1, sizeof(raop_rtp_t));
121 if (!raop_rtp) {
122 return NULL;
123 }
124 raop_rtp->logger = logger;
125 memcpy(&raop_rtp->callbacks, callbacks, sizeof(raop_callbacks_t));
126 raop_rtp->buffer = raop_buffer_init(fmtp, aeskey, aesiv);
127 if (!raop_rtp->buffer) {
128 free(raop_rtp);
129 return NULL;
130 }
131 if (raop_rtp_parse_remote(raop_rtp, remote) < 0) {
132 free(raop_rtp);
133 return NULL;
134 }
135
136 raop_rtp->running = 0;
137 raop_rtp->joined = 1;
138 raop_rtp->flush = NO_FLUSH;
139 MUTEX_CREATE(raop_rtp->run_mutex);
140
141 return raop_rtp;
142 }
143
144 void
145 raop_rtp_destroy(raop_rtp_t *raop_rtp)
146 {
147 if (raop_rtp) {
148 raop_rtp_stop(raop_rtp);
149
150 MUTEX_DESTROY(raop_rtp->run_mutex);
151 raop_buffer_destroy(raop_rtp->buffer);
152 free(raop_rtp);
153 }
154 }
155
156 static int
157 raop_rtp_init_sockets(raop_rtp_t *raop_rtp, int use_ipv6, int use_udp)
158 {
159 int csock = -1, tsock = -1, dsock = -1;
160 unsigned short cport = 0, tport = 0, dport = 0;
161
162 assert(raop_rtp);
163
164 if (use_udp) {
165 csock = netutils_init_socket(&cport, use_ipv6, use_udp);
166 tsock = netutils_init_socket(&tport, use_ipv6, use_udp);
167 if (csock == -1 || tsock == -1) {
168 goto sockets_cleanup;
169 }
170 }
171 dsock = netutils_init_socket(&dport, use_ipv6, use_udp);
172 if (dsock == -1) {
173 goto sockets_cleanup;
174 }
175
176 /* Listen to the data socket if using TCP */
177 if (!use_udp) {
178 if (listen(dsock, 1) < 0)
179 goto sockets_cleanup;
180 }
181
182 /* Set socket descriptors */
183 raop_rtp->csock = csock;
184 raop_rtp->tsock = tsock;
185 raop_rtp->dsock = dsock;
186
187 /* Set port values */
188 raop_rtp->control_lport = cport;
189 raop_rtp->timing_lport = tport;
190 raop_rtp->data_lport = dport;
191 return 0;
192
193 sockets_cleanup:
194 if (csock != -1) closesocket(csock);
195 if (tsock != -1) closesocket(tsock);
196 if (dsock != -1) closesocket(dsock);
197 return -1;
198 }
199
200 static int
201 raop_rtp_resend_callback(void *opaque, unsigned short seqnum, unsigned short count)
202 {
203 raop_rtp_t *raop_rtp = opaque;
204 unsigned char packet[8];
205 unsigned short ourseqnum;
206 struct sockaddr *addr;
207 socklen_t addrlen;
208 int ret;
209
210 addr = (struct sockaddr *)&raop_rtp->control_saddr;
211 addrlen = raop_rtp->control_saddr_len;
212
213 logger_log(raop_rtp->logger, LOGGER_DEBUG, "Got resend request %d %d", seqnum, count);
214 ourseqnum = raop_rtp->control_seqnum++;
215
216 /* Fill the request buffer */
217 packet[0] = 0x80;
218 packet[1] = 0x55|0x80;
219 packet[2] = (ourseqnum >> 8);
220 packet[3] = ourseqnum;
221 packet[4] = (seqnum >> 8);
222 packet[5] = seqnum;
223 packet[6] = (count >> 8);
224 packet[7] = count;
225
226 ret = sendto(raop_rtp->csock, (const char *)packet, sizeof(packet), 0, addr, addrlen);
227 if (ret == -1) {
228 logger_log(raop_rtp->logger, LOGGER_WARNING, "Resend failed: %d", SOCKET_GET_ERROR());
229 }
230
231 return 0;
232 }
233
234 static THREAD_RETVAL
235 raop_rtp_thread_udp(void *arg)
236 {
237 raop_rtp_t *raop_rtp = arg;
238 unsigned char packet[RAOP_PACKET_LEN];
239 unsigned int packetlen;
240 struct sockaddr_storage saddr;
241 socklen_t saddrlen;
242 float volume = 0.0;
243
244 const ALACSpecificConfig *config;
245 void *cb_data = NULL;
246
247 assert(raop_rtp);
248
249 config = raop_buffer_get_config(raop_rtp->buffer);
250 cb_data = raop_rtp->callbacks.audio_init(raop_rtp->callbacks.cls,
251 config->bitDepth,
252 config->numChannels,
253 config->sampleRate);
254
255 while(1) {
256 int volume_changed;
257 int flush;
258
259 fd_set rfds;
260 struct timeval tv;
261 int nfds, ret;
262
263 MUTEX_LOCK(raop_rtp->run_mutex);
264 if (!raop_rtp->running) {
265 MUTEX_UNLOCK(raop_rtp->run_mutex);
266 break;
267 }
268 /* Read the volume level */
269 volume_changed = (volume != raop_rtp->volume);
270 volume = raop_rtp->volume;
271
272 /* Read the flush value */
273 flush = raop_rtp->flush;
274 raop_rtp->flush = NO_FLUSH;
275 MUTEX_UNLOCK(raop_rtp->run_mutex);
276
277 /* Call set_volume callback if changed */
278 if (volume_changed) {
279 raop_rtp->callbacks.audio_set_volume(raop_rtp->callbacks.cls, cb_data, volume);
280 }
281 if (flush != NO_FLUSH) {
282 raop_buffer_flush(raop_rtp->buffer, flush);
283 raop_rtp->callbacks.audio_flush(raop_rtp->callbacks.cls, cb_data);
284 }
285
286 /* Set timeout value to 5ms */
287 tv.tv_sec = 0;
288 tv.tv_usec = 5000;
289
290 /* Get the correct nfds value */
291 nfds = raop_rtp->csock+1;
292 if (raop_rtp->tsock >= nfds)
293 nfds = raop_rtp->tsock+1;
294 if (raop_rtp->dsock >= nfds)
295 nfds = raop_rtp->dsock+1;
296
297 /* Set rfds and call select */
298 FD_ZERO(&rfds);
299 FD_SET(raop_rtp->csock, &rfds);
300 FD_SET(raop_rtp->tsock, &rfds);
301 FD_SET(raop_rtp->dsock, &rfds);
302 ret = select(nfds, &rfds, NULL, NULL, &tv);
303 if (ret == 0) {
304 /* Timeout happened */
305 continue;
306 } else if (ret == -1) {
307 /* FIXME: Error happened */
308 break;
309 }
310
311 if (FD_ISSET(raop_rtp->csock, &rfds)) {
312 saddrlen = sizeof(saddr);
313 packetlen = recvfrom(raop_rtp->csock, (char *)packet, sizeof(packet), 0,
314 (struct sockaddr *)&saddr, &saddrlen);
315
316 /* Get the destination address here, because we need the sin6_scope_id */
317 memcpy(&raop_rtp->control_saddr, &saddr, saddrlen);
318 raop_rtp->control_saddr_len = saddrlen;
319
320 if (packetlen >= 12) {
321 char type = packet[1] & ~0x80;
322
323 logger_log(raop_rtp->logger, LOGGER_DEBUG, "Got control packet of type 0x%02x", type);
324 if (type == 0x56) {
325 /* Handle resent data packet */
326 int ret = raop_buffer_queue(raop_rtp->buffer, packet+4, packetlen-4, 1);
327 assert(ret >= 0);
328 }
329 }
330 } else if (FD_ISSET(raop_rtp->tsock, &rfds)) {
331 logger_log(raop_rtp->logger, LOGGER_INFO, "Would have timing packet in queue");
332 } else if (FD_ISSET(raop_rtp->dsock, &rfds)) {
333 saddrlen = sizeof(saddr);
334 packetlen = recvfrom(raop_rtp->dsock, (char *)packet, sizeof(packet), 0,
335 (struct sockaddr *)&saddr, &saddrlen);
336 if (packetlen >= 12) {
337 int no_resend = (raop_rtp->control_rport == 0);
338 int ret;
339
340 const void *audiobuf;
341 int audiobuflen;
342
343 ret = raop_buffer_queue(raop_rtp->buffer, packet, packetlen, 1);
344 assert(ret >= 0);
345
346 /* Decode all frames in queue */
347 while ((audiobuf = raop_buffer_dequeue(raop_rtp->buffer, &audiobuflen, no_resend))) {
348 raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, cb_data, audiobuf, audiobuflen);
349 }
350
351 /* Handle possible resend requests */
352 if (!no_resend) {
353 raop_buffer_handle_resends(raop_rtp->buffer, raop_rtp_resend_callback, raop_rtp);
354 }
355 }
356 }
357 }
358 logger_log(raop_rtp->logger, LOGGER_INFO, "Exiting UDP RAOP thread");
359 raop_rtp->callbacks.audio_destroy(raop_rtp->callbacks.cls, cb_data);
360
361 return 0;
362 }
363
364 static THREAD_RETVAL
365 raop_rtp_thread_tcp(void *arg)
366 {
367 raop_rtp_t *raop_rtp = arg;
368 int stream_fd = -1;
369 unsigned char packet[RAOP_PACKET_LEN];
370 unsigned int packetlen = 0;
371 float volume = 0.0;
372
373 const ALACSpecificConfig *config;
374 void *cb_data = NULL;
375
376 assert(raop_rtp);
377
378 config = raop_buffer_get_config(raop_rtp->buffer);
379 cb_data = raop_rtp->callbacks.audio_init(raop_rtp->callbacks.cls,
380 config->bitDepth,
381 config->numChannels,
382 config->sampleRate);
383
384 while (1) {
385 int volume_changed;
386
387 fd_set rfds;
388 struct timeval tv;
389 int nfds, ret;
390
391 MUTEX_LOCK(raop_rtp->run_mutex);
392 if (!raop_rtp->running) {
393 MUTEX_UNLOCK(raop_rtp->run_mutex);
394 break;
395 }
396 volume_changed = (volume != raop_rtp->volume);
397 volume = raop_rtp->volume;
398 MUTEX_UNLOCK(raop_rtp->run_mutex);
399
400 /* Call set_volume callback if changed */
401 if (volume_changed) {
402 raop_rtp->callbacks.audio_set_volume(raop_rtp->callbacks.cls, cb_data, volume);
403 }
404
405 /* Set timeout value to 5ms */
406 tv.tv_sec = 0;
407 tv.tv_usec = 5000;
408
409 /* Get the correct nfds value and set rfds */
410 FD_ZERO(&rfds);
411 if (stream_fd == -1) {
412 FD_SET(raop_rtp->dsock, &rfds);
413 nfds = raop_rtp->dsock+1;
414 } else {
415 FD_SET(stream_fd, &rfds);
416 nfds = stream_fd+1;
417 }
418 ret = select(nfds, &rfds, NULL, NULL, &tv);
419 if (ret == 0) {
420 /* Timeout happened */
421 continue;
422 } else if (ret == -1) {
423 /* FIXME: Error happened */
424 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in select");
425 break;
426 }
427 if (stream_fd == -1 && FD_ISSET(raop_rtp->dsock, &rfds)) {
428 struct sockaddr_storage saddr;
429 socklen_t saddrlen;
430
431 logger_log(raop_rtp->logger, LOGGER_INFO, "Accepting client");
432 saddrlen = sizeof(saddr);
433 stream_fd = accept(raop_rtp->dsock, (struct sockaddr *)&saddr, &saddrlen);
434 if (stream_fd == -1) {
435 /* FIXME: Error happened */
436 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in accept %d %s", errno, strerror(errno));
437 break;
438 }
439 }
440 if (stream_fd != -1 && FD_ISSET(stream_fd, &rfds)) {
441 unsigned int rtplen=0;
442
443 const void *audiobuf;
444 int audiobuflen;
445
446 ret = recv(stream_fd, (char *)(packet+packetlen), sizeof(packet)-packetlen, 0);
447 if (ret == 0) {
448 /* TCP socket closed */
449 logger_log(raop_rtp->logger, LOGGER_INFO, "TCP socket closed");
450 break;
451 } else if (ret == -1) {
452 /* FIXME: Error happened */
453 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in recv");
454 break;
455 }
456 packetlen += ret;
457
458 /* Check that we have enough bytes */
459 if (packetlen < 4) {
460 continue;
461 }
462 if (packet[0] != '$' || packet[1] != '\0') {
463 /* FIXME: Incorrect RTP magic bytes */
464 break;
465 }
466 rtplen = (packet[2] << 8) | packet[3];
467 if (rtplen > sizeof(packet)) {
468 /* FIXME: Too long packet */
469 logger_log(raop_rtp->logger, LOGGER_INFO, "Error, packet too long %d", rtplen);
470 break;
471 }
472 if (packetlen < 4+rtplen) {
473 continue;
474 }
475
476 /* Packet is valid, process it */
477 ret = raop_buffer_queue(raop_rtp->buffer, packet+4, rtplen, 0);
478 assert(ret >= 0);
479
480 /* Remove processed bytes from packet buffer */
481 memmove(packet, packet+4+rtplen, packetlen-rtplen);
482 packetlen -= 4+rtplen;
483
484 /* Decode the received frame */
485 if ((audiobuf = raop_buffer_dequeue(raop_rtp->buffer, &audiobuflen, 1))) {
486 raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, cb_data, audiobuf, audiobuflen);
487 }
488 }
489 }
490
491 /* Close the stream file descriptor */
492 if (stream_fd != -1) {
493 closesocket(stream_fd);
494 }
495
496 logger_log(raop_rtp->logger, LOGGER_INFO, "Exiting TCP RAOP thread");
497 raop_rtp->callbacks.audio_destroy(raop_rtp->callbacks.cls, cb_data);
498
499 return 0;
500 }
501
502 void
503 raop_rtp_start(raop_rtp_t *raop_rtp, int use_udp, unsigned short control_rport, unsigned short timing_rport,
504 unsigned short *control_lport, unsigned short *timing_lport, unsigned short *data_lport)
505 {
506 int use_ipv6 = 0;
507
508 assert(raop_rtp);
509
510 MUTEX_LOCK(raop_rtp->run_mutex);
511 if (raop_rtp->running || !raop_rtp->joined) {
512 MUTEX_UNLOCK(raop_rtp->run_mutex);
513 return;
514 }
515
516 /* Initialize ports and sockets */
517 raop_rtp->control_rport = control_rport;
518 raop_rtp->timing_rport = timing_rport;
519 if (raop_rtp->remote_saddr.ss_family == AF_INET6) {
520 use_ipv6 = 1;
521 }
522 if (raop_rtp_init_sockets(raop_rtp, use_ipv6, use_udp) < 0) {
523 logger_log(raop_rtp->logger, LOGGER_INFO, "Initializing sockets failed");
524 MUTEX_UNLOCK(raop_rtp->run_mutex);
525 return;
526 }
527 if (control_lport) *control_lport = raop_rtp->control_lport;
528 if (timing_lport) *timing_lport = raop_rtp->timing_lport;
529 if (data_lport) *data_lport = raop_rtp->data_lport;
530
531 /* Create the thread and initialize running values */
532 raop_rtp->running = 1;
533 raop_rtp->joined = 0;
534 if (use_udp) {
535 THREAD_CREATE(raop_rtp->thread, raop_rtp_thread_udp, raop_rtp);
536 } else {
537 THREAD_CREATE(raop_rtp->thread, raop_rtp_thread_tcp, raop_rtp);
538 }
539 MUTEX_UNLOCK(raop_rtp->run_mutex);
540 }
541
542 void
543 raop_rtp_set_volume(raop_rtp_t *raop_rtp, float volume)
544 {
545 assert(raop_rtp);
546
547 if (volume > 0.0f) {
548 volume = 0.0f;
549 } else if (volume < -144.0f) {
550 volume = -144.0f;
551 }
552
553 /* Set volume in thread instead */
554 MUTEX_LOCK(raop_rtp->run_mutex);
555 raop_rtp->volume = volume;
556 MUTEX_UNLOCK(raop_rtp->run_mutex);
557 }
558
559 void
560 raop_rtp_flush(raop_rtp_t *raop_rtp, int next_seq)
561 {
562 assert(raop_rtp);
563
564 /* Call flush in thread instead */
565 MUTEX_LOCK(raop_rtp->run_mutex);
566 raop_rtp->flush = next_seq;
567 MUTEX_UNLOCK(raop_rtp->run_mutex);
568 }
569
570 void
571 raop_rtp_stop(raop_rtp_t *raop_rtp)
572 {
573 assert(raop_rtp);
574
575 /* Check that we are running and thread is not
576 * joined (should never be while still running) */
577 MUTEX_LOCK(raop_rtp->run_mutex);
578 if (!raop_rtp->running || raop_rtp->joined) {
579 MUTEX_UNLOCK(raop_rtp->run_mutex);
580 return;
581 }
582 raop_rtp->running = 0;
583 MUTEX_UNLOCK(raop_rtp->run_mutex);
584
585 /* Join the thread */
586 THREAD_JOIN(raop_rtp->thread);
587 if (raop_rtp->csock != -1) closesocket(raop_rtp->csock);
588 if (raop_rtp->tsock != -1) closesocket(raop_rtp->tsock);
589 if (raop_rtp->dsock != -1) closesocket(raop_rtp->dsock);
590
591 /* Flush buffer into initial state */
592 raop_buffer_flush(raop_rtp->buffer, -1);
593
594 /* Mark thread as joined */
595 MUTEX_LOCK(raop_rtp->run_mutex);
596 raop_rtp->joined = 1;
597 MUTEX_UNLOCK(raop_rtp->run_mutex);
598 }