Review and cleanup the raop.c a little bit
[deb_shairplay.git] / src / lib / raop_rtp.c
CommitLineData
23e7e3ae
JVH
1/**
2 * Copyright (C) 2011-2012 Juho Vähä-Herttua
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 */
14
2340bcd3
JVH
15#include <stdlib.h>
16#include <stdio.h>
17#include <string.h>
18#include <assert.h>
19#include <errno.h>
20
21#include "raop_rtp.h"
22#include "raop.h"
23#include "raop_buffer.h"
24#include "netutils.h"
25#include "compat.h"
26#include "logger.h"
27
28#define NO_FLUSH (-42)
29
30struct raop_rtp_s {
31 logger_t *logger;
32 raop_callbacks_t callbacks;
33
34 raop_buffer_t *buffer;
35
36 /* These variables only edited mutex locked */
37 int running;
38 int joined;
39 float volume;
40 int flush;
41 thread_handle_t thread;
42 mutex_handle_t run_mutex;
43
44 /* Remote control and timing ports */
45 unsigned short control_rport;
46 unsigned short timing_rport;
47
48 /* Sockets for control, timing and data */
49 int csock, tsock, dsock;
50
51 /* Local control, timing and data ports */
52 unsigned short control_lport;
53 unsigned short timing_lport;
54 unsigned short data_lport;
55
56 struct sockaddr_storage control_saddr;
57 socklen_t control_saddr_len;
58 unsigned short control_seqnum;
59};
60
61raop_rtp_t *
62raop_rtp_init(logger_t *logger, raop_callbacks_t *callbacks, const char *fmtp,
63 const unsigned char *aeskey, const unsigned char *aesiv)
64{
65 raop_rtp_t *raop_rtp;
66
67 assert(logger);
68
69 raop_rtp = calloc(1, sizeof(raop_rtp_t));
70 if (!raop_rtp) {
71 return NULL;
72 }
73 raop_rtp->logger = logger;
74 memcpy(&raop_rtp->callbacks, callbacks, sizeof(raop_callbacks_t));
75 raop_rtp->buffer = raop_buffer_init(fmtp, aeskey, aesiv);
76 if (!raop_rtp->buffer) {
77 free(raop_rtp);
78 return NULL;
79 }
80
81 raop_rtp->running = 0;
82 raop_rtp->joined = 1;
83 raop_rtp->flush = NO_FLUSH;
84 MUTEX_CREATE(raop_rtp->run_mutex);
85
86 return raop_rtp;
87}
88
89void
90raop_rtp_destroy(raop_rtp_t *raop_rtp)
91{
92 if (raop_rtp) {
93 raop_rtp_stop(raop_rtp);
94
95 MUTEX_DESTROY(raop_rtp->run_mutex);
96 raop_buffer_destroy(raop_rtp->buffer);
97 free(raop_rtp);
98 }
99}
100
101static int
102raop_rtp_init_sockets(raop_rtp_t *raop_rtp, int use_ipv6, int use_udp)
103{
104 int csock = -1, tsock = -1, dsock = -1;
105 unsigned short cport = 0, tport = 0, dport = 0;
106
107 assert(raop_rtp);
108
109 if (use_udp) {
110 csock = netutils_init_socket(&cport, use_ipv6, use_udp);
111 tsock = netutils_init_socket(&tport, use_ipv6, use_udp);
112 if (csock == -1 || tsock == -1) {
113 goto sockets_cleanup;
114 }
115 }
116 dsock = netutils_init_socket(&dport, use_ipv6, use_udp);
117 if (dsock == -1) {
118 goto sockets_cleanup;
119 }
120
121 /* Listen to the data socket if using TCP */
122 if (!use_udp) {
123 if (listen(dsock, 1) < 0)
124 goto sockets_cleanup;
125 }
126
127 /* Set socket descriptors */
128 raop_rtp->csock = csock;
129 raop_rtp->tsock = tsock;
130 raop_rtp->dsock = dsock;
131
132 /* Set port values */
133 raop_rtp->control_lport = cport;
134 raop_rtp->timing_lport = tport;
135 raop_rtp->data_lport = dport;
136 return 0;
137
138sockets_cleanup:
139 if (csock != -1) closesocket(csock);
140 if (tsock != -1) closesocket(tsock);
141 if (dsock != -1) closesocket(dsock);
142 return -1;
143}
144
145static int
146raop_rtp_resend_callback(void *opaque, unsigned short seqnum, unsigned short count)
147{
148 raop_rtp_t *raop_rtp = opaque;
149 unsigned char packet[8];
150 unsigned short ourseqnum;
151 struct sockaddr *addr;
152 socklen_t addrlen;
153
154 addr = (struct sockaddr *)&raop_rtp->control_saddr;
155 addrlen = raop_rtp->control_saddr_len;
156
14003408 157 logger_log(raop_rtp->logger, LOGGER_DEBUG, "Got resend request %d %d\n", seqnum, count);
2340bcd3
JVH
158 ourseqnum = raop_rtp->control_seqnum++;
159
160 /* Fill the request buffer */
161 packet[0] = 0x80;
162 packet[1] = 0x55|0x80;
163 packet[2] = (ourseqnum >> 8);
164 packet[3] = ourseqnum;
165 packet[4] = (seqnum >> 8);
166 packet[5] = seqnum;
167 packet[6] = (count >> 8);
168 packet[7] = count;
169
170 sendto(raop_rtp->csock, (const char *)packet, sizeof(packet), 0, addr, addrlen);
171 return 0;
172}
173
174static THREAD_RETVAL
175raop_rtp_thread_udp(void *arg)
176{
177 raop_rtp_t *raop_rtp = arg;
178 unsigned char packet[RAOP_PACKET_LEN];
179 unsigned int packetlen;
180 struct sockaddr_storage saddr;
181 socklen_t saddrlen;
aaa2c733 182 float volume = 0.0;
2340bcd3
JVH
183
184 const ALACSpecificConfig *config;
185 void *cb_data = NULL;
186
187 assert(raop_rtp);
188
189 config = raop_buffer_get_config(raop_rtp->buffer);
190 raop_rtp->callbacks.audio_init(raop_rtp->callbacks.cls, &cb_data,
191 config->bitDepth,
192 config->numChannels,
193 config->sampleRate);
194
195 while(1) {
196 int volume_changed;
2340bcd3
JVH
197 int flush;
198
199 fd_set rfds;
200 struct timeval tv;
201 int nfds, ret;
202
203 MUTEX_LOCK(raop_rtp->run_mutex);
204 if (!raop_rtp->running) {
205 MUTEX_UNLOCK(raop_rtp->run_mutex);
206 break;
207 }
208 /* Read the volume level */
209 volume_changed = (volume != raop_rtp->volume);
210 volume = raop_rtp->volume;
211
212 /* Read the flush value */
213 flush = raop_rtp->flush;
214 raop_rtp->flush = NO_FLUSH;
215 MUTEX_UNLOCK(raop_rtp->run_mutex);
216
217 /* Call set_volume callback if changed */
218 if (volume_changed) {
219 raop_rtp->callbacks.audio_set_volume(raop_rtp->callbacks.cls, cb_data, volume);
220 }
221 if (flush != NO_FLUSH) {
222 raop_buffer_flush(raop_rtp->buffer, flush);
223 raop_rtp->callbacks.audio_flush(raop_rtp->callbacks.cls, cb_data);
224 }
225
226 /* Set timeout value to 5ms */
227 tv.tv_sec = 0;
228 tv.tv_usec = 5000;
229
230 /* Get the correct nfds value */
231 nfds = raop_rtp->csock+1;
232 if (raop_rtp->tsock >= nfds)
233 nfds = raop_rtp->tsock+1;
234 if (raop_rtp->dsock >= nfds)
235 nfds = raop_rtp->dsock+1;
236
237 /* Set rfds and call select */
238 FD_ZERO(&rfds);
239 FD_SET(raop_rtp->csock, &rfds);
240 FD_SET(raop_rtp->tsock, &rfds);
241 FD_SET(raop_rtp->dsock, &rfds);
242 ret = select(nfds, &rfds, NULL, NULL, &tv);
243 if (ret == 0) {
244 /* Timeout happened */
245 continue;
246 } else if (ret == -1) {
247 /* FIXME: Error happened */
248 break;
249 }
250
251 if (FD_ISSET(raop_rtp->csock, &rfds)) {
252 saddrlen = sizeof(saddr);
253 packetlen = recvfrom(raop_rtp->csock, (char *)packet, sizeof(packet), 0,
254 (struct sockaddr *)&saddr, &saddrlen);
255
256 /* FIXME: Get destination address here */
257 memcpy(&raop_rtp->control_saddr, &saddr, saddrlen);
258 raop_rtp->control_saddr_len = saddrlen;
259
260 if (packetlen >= 12) {
261 char type = packet[1] & ~0x80;
262
14003408 263 logger_log(raop_rtp->logger, LOGGER_DEBUG, "Got control packet of type 0x%02x\n", type);
2340bcd3
JVH
264 if (type == 0x56) {
265 /* Handle resent data packet */
266 int ret = raop_buffer_queue(raop_rtp->buffer, packet+4, packetlen-4, 1);
267 assert(ret >= 0);
268 }
269 }
270 } else if (FD_ISSET(raop_rtp->tsock, &rfds)) {
271 logger_log(raop_rtp->logger, LOGGER_INFO, "Would have timing packet in queue\n");
272 } else if (FD_ISSET(raop_rtp->dsock, &rfds)) {
273 saddrlen = sizeof(saddr);
274 packetlen = recvfrom(raop_rtp->dsock, (char *)packet, sizeof(packet), 0,
275 (struct sockaddr *)&saddr, &saddrlen);
276 if (packetlen >= 12) {
277 int no_resend = (raop_rtp->control_rport == 0);
278 int ret;
279
280 const void *audiobuf;
281 int audiobuflen;
282
283 ret = raop_buffer_queue(raop_rtp->buffer, packet, packetlen, 1);
284 assert(ret >= 0);
285
286 /* Decode all frames in queue */
287 while ((audiobuf = raop_buffer_dequeue(raop_rtp->buffer, &audiobuflen, no_resend))) {
288 raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, cb_data, audiobuf, audiobuflen);
289 }
290
291 /* Handle possible resend requests */
292 if (!no_resend) {
293 raop_buffer_handle_resends(raop_rtp->buffer, raop_rtp_resend_callback, raop_rtp);
294 }
295 }
296 }
297 }
298 logger_log(raop_rtp->logger, LOGGER_INFO, "Exiting thread\n");
299 raop_rtp->callbacks.audio_destroy(raop_rtp->callbacks.cls, cb_data);
300
301 return 0;
302}
303
304static THREAD_RETVAL
305raop_rtp_thread_tcp(void *arg)
306{
307 raop_rtp_t *raop_rtp = arg;
308 int stream_fd = -1;
309 unsigned char packet[RAOP_PACKET_LEN];
310 unsigned int packetlen = 0;
aaa2c733 311 float volume = 0.0;
2340bcd3
JVH
312
313 const ALACSpecificConfig *config;
314 void *cb_data = NULL;
315
316 assert(raop_rtp);
317
318 config = raop_buffer_get_config(raop_rtp->buffer);
319 raop_rtp->callbacks.audio_init(raop_rtp->callbacks.cls, &cb_data,
320 config->bitDepth,
321 config->numChannels,
322 config->sampleRate);
323
324 while (1) {
325 int volume_changed;
2340bcd3
JVH
326
327 fd_set rfds;
328 struct timeval tv;
329 int nfds, ret;
330
331 MUTEX_LOCK(raop_rtp->run_mutex);
332 if (!raop_rtp->running) {
333 MUTEX_UNLOCK(raop_rtp->run_mutex);
334 break;
335 }
336 volume_changed = (volume != raop_rtp->volume);
337 volume = raop_rtp->volume;
338 MUTEX_UNLOCK(raop_rtp->run_mutex);
339
340 /* Call set_volume callback if changed */
341 if (volume_changed) {
342 raop_rtp->callbacks.audio_set_volume(raop_rtp->callbacks.cls, cb_data, volume);
343 }
344
345 /* Set timeout value to 5ms */
346 tv.tv_sec = 0;
347 tv.tv_usec = 5000;
348
349 /* Get the correct nfds value and set rfds */
350 FD_ZERO(&rfds);
351 if (stream_fd == -1) {
352 FD_SET(raop_rtp->dsock, &rfds);
353 nfds = raop_rtp->dsock+1;
354 } else {
355 FD_SET(stream_fd, &rfds);
356 nfds = stream_fd+1;
357 }
358 ret = select(nfds, &rfds, NULL, NULL, &tv);
359 if (ret == 0) {
360 /* Timeout happened */
361 continue;
362 } else if (ret == -1) {
363 /* FIXME: Error happened */
364 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in select\n");
365 break;
366 }
367 if (stream_fd == -1 && FD_ISSET(raop_rtp->dsock, &rfds)) {
368 struct sockaddr_storage saddr;
369 socklen_t saddrlen;
370
371 logger_log(raop_rtp->logger, LOGGER_INFO, "Accepting client\n");
372 saddrlen = sizeof(saddr);
373 stream_fd = accept(raop_rtp->dsock, (struct sockaddr *)&saddr, &saddrlen);
374 if (stream_fd == -1) {
375 /* FIXME: Error happened */
376 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in accept %d %s\n", errno, strerror(errno));
377 break;
378 }
379 }
380 if (stream_fd != -1 && FD_ISSET(stream_fd, &rfds)) {
381 unsigned int rtplen=0;
2340bcd3
JVH
382
383 const void *audiobuf;
384 int audiobuflen;
385
386 ret = recv(stream_fd, (char *)(packet+packetlen), sizeof(packet)-packetlen, 0);
387 if (ret == 0) {
388 /* TCP socket closed */
389 logger_log(raop_rtp->logger, LOGGER_INFO, "TCP socket closed\n");
390 break;
391 } else if (ret == -1) {
392 /* FIXME: Error happened */
393 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in recv\n");
394 break;
395 }
396 packetlen += ret;
397
398 /* Check that we have enough bytes */
399 if (packetlen < 4) {
400 continue;
401 }
402 if (packet[0] != '$' || packet[1] != '\0') {
403 /* FIXME: Incorrect RTP magic bytes */
404 break;
405 }
406 rtplen = (packet[2] << 8) | packet[3];
407 if (rtplen > sizeof(packet)) {
408 /* FIXME: Too long packet */
409 logger_log(raop_rtp->logger, LOGGER_INFO, "Error, packet too long %d\n", rtplen);
410 break;
411 }
412 if (packetlen < 4+rtplen) {
413 continue;
414 }
415
416 /* Packet is valid, process it */
2340bcd3
JVH
417 ret = raop_buffer_queue(raop_rtp->buffer, packet+4, rtplen, 0);
418 assert(ret >= 0);
419
420 /* Remove processed bytes from packet buffer */
421 memmove(packet, packet+4+rtplen, packetlen-rtplen);
422 packetlen -= 4+rtplen;
423
424 /* Decode the received frame */
425 if ((audiobuf = raop_buffer_dequeue(raop_rtp->buffer, &audiobuflen, 1))) {
426 raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, cb_data, audiobuf, audiobuflen);
427 }
428 }
429 }
430
431 /* Close the stream file descriptor */
432 if (stream_fd != -1) {
433 closesocket(stream_fd);
434 }
435
436 logger_log(raop_rtp->logger, LOGGER_INFO, "Exiting thread\n");
437 raop_rtp->callbacks.audio_destroy(raop_rtp->callbacks.cls, cb_data);
438
439 return 0;
440}
441
442void
443raop_rtp_start(raop_rtp_t *raop_rtp, int use_udp, unsigned short control_rport, unsigned short timing_rport,
444 unsigned short *control_lport, unsigned short *timing_lport, unsigned short *data_lport)
445{
446 assert(raop_rtp);
447
448 MUTEX_LOCK(raop_rtp->run_mutex);
449 if (raop_rtp->running || !raop_rtp->joined) {
450 MUTEX_UNLOCK(raop_rtp->run_mutex);
451 return;
452 }
453
454 /* Initialize ports and sockets */
455 raop_rtp->control_rport = control_rport;
456 raop_rtp->timing_rport = timing_rport;
457 if (raop_rtp_init_sockets(raop_rtp, 1, use_udp) < 0) {
458 logger_log(raop_rtp->logger, LOGGER_INFO, "Initializing sockets failed\n");
459 MUTEX_UNLOCK(raop_rtp->run_mutex);
460 return;
461 }
462 if (control_lport) *control_lport = raop_rtp->control_lport;
463 if (timing_lport) *timing_lport = raop_rtp->timing_lport;
464 if (data_lport) *data_lport = raop_rtp->data_lport;
465
466 /* Create the thread and initialize running values */
467 raop_rtp->running = 1;
468 raop_rtp->joined = 0;
469 if (use_udp) {
470 THREAD_CREATE(raop_rtp->thread, raop_rtp_thread_udp, raop_rtp);
471 } else {
472 THREAD_CREATE(raop_rtp->thread, raop_rtp_thread_tcp, raop_rtp);
473 }
474 MUTEX_UNLOCK(raop_rtp->run_mutex);
475}
476
477void
478raop_rtp_set_volume(raop_rtp_t *raop_rtp, float volume)
479{
480 assert(raop_rtp);
481
482 if (volume > 0.0f) {
483 volume = 0.0f;
484 } else if (volume < -144.0f) {
485 volume = -144.0f;
486 }
487
488 /* Set volume in thread instead */
489 MUTEX_LOCK(raop_rtp->run_mutex);
490 raop_rtp->volume = volume;
491 MUTEX_UNLOCK(raop_rtp->run_mutex);
492}
493
494void
495raop_rtp_flush(raop_rtp_t *raop_rtp, int next_seq)
496{
497 assert(raop_rtp);
498
499 /* Call flush in thread instead */
500 MUTEX_LOCK(raop_rtp->run_mutex);
501 raop_rtp->flush = next_seq;
502 MUTEX_UNLOCK(raop_rtp->run_mutex);
503}
504
505void
506raop_rtp_stop(raop_rtp_t *raop_rtp)
507{
508 assert(raop_rtp);
509
510 /* Check that we are running and thread is not
511 * joined (should never be while still running) */
512 MUTEX_LOCK(raop_rtp->run_mutex);
513 if (!raop_rtp->running || raop_rtp->joined) {
514 MUTEX_UNLOCK(raop_rtp->run_mutex);
515 return;
516 }
517 raop_rtp->running = 0;
518 MUTEX_UNLOCK(raop_rtp->run_mutex);
519
520 /* Join the thread */
521 THREAD_JOIN(raop_rtp->thread);
522 if (raop_rtp->csock != -1) closesocket(raop_rtp->csock);
523 if (raop_rtp->tsock != -1) closesocket(raop_rtp->tsock);
524 if (raop_rtp->dsock != -1) closesocket(raop_rtp->dsock);
525
526 /* Flush buffer into initial state */
527 raop_buffer_flush(raop_rtp->buffer, -1);
528
529 /* Mark thread as joined */
530 MUTEX_LOCK(raop_rtp->run_mutex);
531 raop_rtp->joined = 1;
532 MUTEX_UNLOCK(raop_rtp->run_mutex);
533}