Move src to src/lib, include to src/include, test to src/test.
[deb_shairplay.git] / src / lib / raop_rtp.c
1 #include <stdlib.h>
2 #include <stdio.h>
3 #include <string.h>
4 #include <assert.h>
5 #include <errno.h>
6
7 #include "raop_rtp.h"
8 #include "raop.h"
9 #include "raop_buffer.h"
10 #include "netutils.h"
11 #include "compat.h"
12 #include "logger.h"
13
14 #define NO_FLUSH (-42)
15
16 struct raop_rtp_s {
17 logger_t *logger;
18 raop_callbacks_t callbacks;
19
20 raop_buffer_t *buffer;
21
22 /* These variables only edited mutex locked */
23 int running;
24 int joined;
25 float volume;
26 int flush;
27 thread_handle_t thread;
28 mutex_handle_t run_mutex;
29
30 /* Remote control and timing ports */
31 unsigned short control_rport;
32 unsigned short timing_rport;
33
34 /* Sockets for control, timing and data */
35 int csock, tsock, dsock;
36
37 /* Local control, timing and data ports */
38 unsigned short control_lport;
39 unsigned short timing_lport;
40 unsigned short data_lport;
41
42 struct sockaddr_storage control_saddr;
43 socklen_t control_saddr_len;
44 unsigned short control_seqnum;
45 };
46
47 raop_rtp_t *
48 raop_rtp_init(logger_t *logger, raop_callbacks_t *callbacks, const char *fmtp,
49 const unsigned char *aeskey, const unsigned char *aesiv)
50 {
51 raop_rtp_t *raop_rtp;
52
53 assert(logger);
54
55 raop_rtp = calloc(1, sizeof(raop_rtp_t));
56 if (!raop_rtp) {
57 return NULL;
58 }
59 raop_rtp->logger = logger;
60 memcpy(&raop_rtp->callbacks, callbacks, sizeof(raop_callbacks_t));
61 raop_rtp->buffer = raop_buffer_init(fmtp, aeskey, aesiv);
62 if (!raop_rtp->buffer) {
63 free(raop_rtp);
64 return NULL;
65 }
66
67 raop_rtp->running = 0;
68 raop_rtp->joined = 1;
69 raop_rtp->flush = NO_FLUSH;
70 MUTEX_CREATE(raop_rtp->run_mutex);
71
72 return raop_rtp;
73 }
74
75 void
76 raop_rtp_destroy(raop_rtp_t *raop_rtp)
77 {
78 if (raop_rtp) {
79 raop_rtp_stop(raop_rtp);
80
81 MUTEX_DESTROY(raop_rtp->run_mutex);
82 raop_buffer_destroy(raop_rtp->buffer);
83 free(raop_rtp);
84 }
85 }
86
87 static int
88 raop_rtp_init_sockets(raop_rtp_t *raop_rtp, int use_ipv6, int use_udp)
89 {
90 int csock = -1, tsock = -1, dsock = -1;
91 unsigned short cport = 0, tport = 0, dport = 0;
92
93 assert(raop_rtp);
94
95 if (use_udp) {
96 csock = netutils_init_socket(&cport, use_ipv6, use_udp);
97 tsock = netutils_init_socket(&tport, use_ipv6, use_udp);
98 if (csock == -1 || tsock == -1) {
99 goto sockets_cleanup;
100 }
101 }
102 dsock = netutils_init_socket(&dport, use_ipv6, use_udp);
103 if (dsock == -1) {
104 goto sockets_cleanup;
105 }
106
107 /* Listen to the data socket if using TCP */
108 if (!use_udp) {
109 if (listen(dsock, 1) < 0)
110 goto sockets_cleanup;
111 }
112
113 /* Set socket descriptors */
114 raop_rtp->csock = csock;
115 raop_rtp->tsock = tsock;
116 raop_rtp->dsock = dsock;
117
118 /* Set port values */
119 raop_rtp->control_lport = cport;
120 raop_rtp->timing_lport = tport;
121 raop_rtp->data_lport = dport;
122 return 0;
123
124 sockets_cleanup:
125 if (csock != -1) closesocket(csock);
126 if (tsock != -1) closesocket(tsock);
127 if (dsock != -1) closesocket(dsock);
128 return -1;
129 }
130
131 static int
132 raop_rtp_resend_callback(void *opaque, unsigned short seqnum, unsigned short count)
133 {
134 raop_rtp_t *raop_rtp = opaque;
135 unsigned char packet[8];
136 unsigned short ourseqnum;
137 struct sockaddr *addr;
138 socklen_t addrlen;
139
140 addr = (struct sockaddr *)&raop_rtp->control_saddr;
141 addrlen = raop_rtp->control_saddr_len;
142
143 logger_log(raop_rtp->logger, LOGGER_DEBUG, "Got resend request %d %d\n", seqnum, count);
144 ourseqnum = raop_rtp->control_seqnum++;
145
146 /* Fill the request buffer */
147 packet[0] = 0x80;
148 packet[1] = 0x55|0x80;
149 packet[2] = (ourseqnum >> 8);
150 packet[3] = ourseqnum;
151 packet[4] = (seqnum >> 8);
152 packet[5] = seqnum;
153 packet[6] = (count >> 8);
154 packet[7] = count;
155
156 sendto(raop_rtp->csock, (const char *)packet, sizeof(packet), 0, addr, addrlen);
157 return 0;
158 }
159
160 static THREAD_RETVAL
161 raop_rtp_thread_udp(void *arg)
162 {
163 raop_rtp_t *raop_rtp = arg;
164 unsigned char packet[RAOP_PACKET_LEN];
165 unsigned int packetlen;
166 struct sockaddr_storage saddr;
167 socklen_t saddrlen;
168
169 const ALACSpecificConfig *config;
170 void *cb_data = NULL;
171
172 assert(raop_rtp);
173
174 config = raop_buffer_get_config(raop_rtp->buffer);
175 raop_rtp->callbacks.audio_init(raop_rtp->callbacks.cls, &cb_data,
176 config->bitDepth,
177 config->numChannels,
178 config->sampleRate);
179
180 while(1) {
181 int volume_changed;
182 float volume = 0.0;
183 int flush;
184
185 fd_set rfds;
186 struct timeval tv;
187 int nfds, ret;
188
189 MUTEX_LOCK(raop_rtp->run_mutex);
190 if (!raop_rtp->running) {
191 MUTEX_UNLOCK(raop_rtp->run_mutex);
192 break;
193 }
194 /* Read the volume level */
195 volume_changed = (volume != raop_rtp->volume);
196 volume = raop_rtp->volume;
197
198 /* Read the flush value */
199 flush = raop_rtp->flush;
200 raop_rtp->flush = NO_FLUSH;
201 MUTEX_UNLOCK(raop_rtp->run_mutex);
202
203 /* Call set_volume callback if changed */
204 if (volume_changed) {
205 raop_rtp->callbacks.audio_set_volume(raop_rtp->callbacks.cls, cb_data, volume);
206 }
207 if (flush != NO_FLUSH) {
208 raop_buffer_flush(raop_rtp->buffer, flush);
209 raop_rtp->callbacks.audio_flush(raop_rtp->callbacks.cls, cb_data);
210 }
211
212 /* Set timeout value to 5ms */
213 tv.tv_sec = 0;
214 tv.tv_usec = 5000;
215
216 /* Get the correct nfds value */
217 nfds = raop_rtp->csock+1;
218 if (raop_rtp->tsock >= nfds)
219 nfds = raop_rtp->tsock+1;
220 if (raop_rtp->dsock >= nfds)
221 nfds = raop_rtp->dsock+1;
222
223 /* Set rfds and call select */
224 FD_ZERO(&rfds);
225 FD_SET(raop_rtp->csock, &rfds);
226 FD_SET(raop_rtp->tsock, &rfds);
227 FD_SET(raop_rtp->dsock, &rfds);
228 ret = select(nfds, &rfds, NULL, NULL, &tv);
229 if (ret == 0) {
230 /* Timeout happened */
231 continue;
232 } else if (ret == -1) {
233 /* FIXME: Error happened */
234 break;
235 }
236
237 if (FD_ISSET(raop_rtp->csock, &rfds)) {
238 saddrlen = sizeof(saddr);
239 packetlen = recvfrom(raop_rtp->csock, (char *)packet, sizeof(packet), 0,
240 (struct sockaddr *)&saddr, &saddrlen);
241
242 /* FIXME: Get destination address here */
243 memcpy(&raop_rtp->control_saddr, &saddr, saddrlen);
244 raop_rtp->control_saddr_len = saddrlen;
245
246 if (packetlen >= 12) {
247 char type = packet[1] & ~0x80;
248
249 logger_log(raop_rtp->logger, LOGGER_DEBUG, "Got control packet of type 0x%02x\n", type);
250 if (type == 0x56) {
251 /* Handle resent data packet */
252 int ret = raop_buffer_queue(raop_rtp->buffer, packet+4, packetlen-4, 1);
253 assert(ret >= 0);
254 }
255 }
256 } else if (FD_ISSET(raop_rtp->tsock, &rfds)) {
257 logger_log(raop_rtp->logger, LOGGER_INFO, "Would have timing packet in queue\n");
258 } else if (FD_ISSET(raop_rtp->dsock, &rfds)) {
259 saddrlen = sizeof(saddr);
260 packetlen = recvfrom(raop_rtp->dsock, (char *)packet, sizeof(packet), 0,
261 (struct sockaddr *)&saddr, &saddrlen);
262 if (packetlen >= 12) {
263 int no_resend = (raop_rtp->control_rport == 0);
264 int ret;
265
266 const void *audiobuf;
267 int audiobuflen;
268
269 ret = raop_buffer_queue(raop_rtp->buffer, packet, packetlen, 1);
270 assert(ret >= 0);
271
272 /* Decode all frames in queue */
273 while ((audiobuf = raop_buffer_dequeue(raop_rtp->buffer, &audiobuflen, no_resend))) {
274 raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, cb_data, audiobuf, audiobuflen);
275 }
276
277 /* Handle possible resend requests */
278 if (!no_resend) {
279 raop_buffer_handle_resends(raop_rtp->buffer, raop_rtp_resend_callback, raop_rtp);
280 }
281 }
282 }
283 }
284 logger_log(raop_rtp->logger, LOGGER_INFO, "Exiting thread\n");
285 raop_rtp->callbacks.audio_destroy(raop_rtp->callbacks.cls, cb_data);
286
287 return 0;
288 }
289
290 static THREAD_RETVAL
291 raop_rtp_thread_tcp(void *arg)
292 {
293 raop_rtp_t *raop_rtp = arg;
294 int stream_fd = -1;
295 unsigned char packet[RAOP_PACKET_LEN];
296 unsigned int packetlen = 0;
297
298 const ALACSpecificConfig *config;
299 void *cb_data = NULL;
300
301 assert(raop_rtp);
302
303 config = raop_buffer_get_config(raop_rtp->buffer);
304 raop_rtp->callbacks.audio_init(raop_rtp->callbacks.cls, &cb_data,
305 config->bitDepth,
306 config->numChannels,
307 config->sampleRate);
308
309 while (1) {
310 int volume_changed;
311 float volume = 0.0;
312
313 fd_set rfds;
314 struct timeval tv;
315 int nfds, ret;
316
317 MUTEX_LOCK(raop_rtp->run_mutex);
318 if (!raop_rtp->running) {
319 MUTEX_UNLOCK(raop_rtp->run_mutex);
320 break;
321 }
322 volume_changed = (volume != raop_rtp->volume);
323 volume = raop_rtp->volume;
324 MUTEX_UNLOCK(raop_rtp->run_mutex);
325
326 /* Call set_volume callback if changed */
327 if (volume_changed) {
328 raop_rtp->callbacks.audio_set_volume(raop_rtp->callbacks.cls, cb_data, volume);
329 }
330
331 /* Set timeout value to 5ms */
332 tv.tv_sec = 0;
333 tv.tv_usec = 5000;
334
335 /* Get the correct nfds value and set rfds */
336 FD_ZERO(&rfds);
337 if (stream_fd == -1) {
338 FD_SET(raop_rtp->dsock, &rfds);
339 nfds = raop_rtp->dsock+1;
340 } else {
341 FD_SET(stream_fd, &rfds);
342 nfds = stream_fd+1;
343 }
344 ret = select(nfds, &rfds, NULL, NULL, &tv);
345 if (ret == 0) {
346 /* Timeout happened */
347 continue;
348 } else if (ret == -1) {
349 /* FIXME: Error happened */
350 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in select\n");
351 break;
352 }
353 if (stream_fd == -1 && FD_ISSET(raop_rtp->dsock, &rfds)) {
354 struct sockaddr_storage saddr;
355 socklen_t saddrlen;
356
357 logger_log(raop_rtp->logger, LOGGER_INFO, "Accepting client\n");
358 saddrlen = sizeof(saddr);
359 stream_fd = accept(raop_rtp->dsock, (struct sockaddr *)&saddr, &saddrlen);
360 if (stream_fd == -1) {
361 /* FIXME: Error happened */
362 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in accept %d %s\n", errno, strerror(errno));
363 break;
364 }
365 }
366 if (stream_fd != -1 && FD_ISSET(stream_fd, &rfds)) {
367 unsigned int rtplen=0;
368 char type;
369
370 const void *audiobuf;
371 int audiobuflen;
372
373 ret = recv(stream_fd, (char *)(packet+packetlen), sizeof(packet)-packetlen, 0);
374 if (ret == 0) {
375 /* TCP socket closed */
376 logger_log(raop_rtp->logger, LOGGER_INFO, "TCP socket closed\n");
377 break;
378 } else if (ret == -1) {
379 /* FIXME: Error happened */
380 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in recv\n");
381 break;
382 }
383 packetlen += ret;
384
385 /* Check that we have enough bytes */
386 if (packetlen < 4) {
387 continue;
388 }
389 if (packet[0] != '$' || packet[1] != '\0') {
390 /* FIXME: Incorrect RTP magic bytes */
391 break;
392 }
393 rtplen = (packet[2] << 8) | packet[3];
394 if (rtplen > sizeof(packet)) {
395 /* FIXME: Too long packet */
396 logger_log(raop_rtp->logger, LOGGER_INFO, "Error, packet too long %d\n", rtplen);
397 break;
398 }
399 if (packetlen < 4+rtplen) {
400 continue;
401 }
402
403 /* Packet is valid, process it */
404 type = packet[4+1] & ~0x80;
405 ret = raop_buffer_queue(raop_rtp->buffer, packet+4, rtplen, 0);
406 assert(ret >= 0);
407
408 /* Remove processed bytes from packet buffer */
409 memmove(packet, packet+4+rtplen, packetlen-rtplen);
410 packetlen -= 4+rtplen;
411
412 /* Decode the received frame */
413 if ((audiobuf = raop_buffer_dequeue(raop_rtp->buffer, &audiobuflen, 1))) {
414 raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, cb_data, audiobuf, audiobuflen);
415 }
416 }
417 }
418
419 /* Close the stream file descriptor */
420 if (stream_fd != -1) {
421 closesocket(stream_fd);
422 }
423
424 logger_log(raop_rtp->logger, LOGGER_INFO, "Exiting thread\n");
425 raop_rtp->callbacks.audio_destroy(raop_rtp->callbacks.cls, cb_data);
426
427 return 0;
428 }
429
430 void
431 raop_rtp_start(raop_rtp_t *raop_rtp, int use_udp, unsigned short control_rport, unsigned short timing_rport,
432 unsigned short *control_lport, unsigned short *timing_lport, unsigned short *data_lport)
433 {
434 assert(raop_rtp);
435
436 MUTEX_LOCK(raop_rtp->run_mutex);
437 if (raop_rtp->running || !raop_rtp->joined) {
438 MUTEX_UNLOCK(raop_rtp->run_mutex);
439 return;
440 }
441
442 /* Initialize ports and sockets */
443 raop_rtp->control_rport = control_rport;
444 raop_rtp->timing_rport = timing_rport;
445 if (raop_rtp_init_sockets(raop_rtp, 1, use_udp) < 0) {
446 logger_log(raop_rtp->logger, LOGGER_INFO, "Initializing sockets failed\n");
447 MUTEX_UNLOCK(raop_rtp->run_mutex);
448 return;
449 }
450 if (control_lport) *control_lport = raop_rtp->control_lport;
451 if (timing_lport) *timing_lport = raop_rtp->timing_lport;
452 if (data_lport) *data_lport = raop_rtp->data_lport;
453
454 /* Create the thread and initialize running values */
455 raop_rtp->running = 1;
456 raop_rtp->joined = 0;
457 if (use_udp) {
458 THREAD_CREATE(raop_rtp->thread, raop_rtp_thread_udp, raop_rtp);
459 } else {
460 THREAD_CREATE(raop_rtp->thread, raop_rtp_thread_tcp, raop_rtp);
461 }
462 MUTEX_UNLOCK(raop_rtp->run_mutex);
463 }
464
465 void
466 raop_rtp_set_volume(raop_rtp_t *raop_rtp, float volume)
467 {
468 assert(raop_rtp);
469
470 if (volume > 0.0f) {
471 volume = 0.0f;
472 } else if (volume < -144.0f) {
473 volume = -144.0f;
474 }
475
476 /* Set volume in thread instead */
477 MUTEX_LOCK(raop_rtp->run_mutex);
478 raop_rtp->volume = volume;
479 MUTEX_UNLOCK(raop_rtp->run_mutex);
480 }
481
482 void
483 raop_rtp_flush(raop_rtp_t *raop_rtp, int next_seq)
484 {
485 assert(raop_rtp);
486
487 /* Call flush in thread instead */
488 MUTEX_LOCK(raop_rtp->run_mutex);
489 raop_rtp->flush = next_seq;
490 MUTEX_UNLOCK(raop_rtp->run_mutex);
491 }
492
493 void
494 raop_rtp_stop(raop_rtp_t *raop_rtp)
495 {
496 assert(raop_rtp);
497
498 /* Check that we are running and thread is not
499 * joined (should never be while still running) */
500 MUTEX_LOCK(raop_rtp->run_mutex);
501 if (!raop_rtp->running || raop_rtp->joined) {
502 MUTEX_UNLOCK(raop_rtp->run_mutex);
503 return;
504 }
505 raop_rtp->running = 0;
506 MUTEX_UNLOCK(raop_rtp->run_mutex);
507
508 /* Join the thread */
509 THREAD_JOIN(raop_rtp->thread);
510 if (raop_rtp->csock != -1) closesocket(raop_rtp->csock);
511 if (raop_rtp->tsock != -1) closesocket(raop_rtp->tsock);
512 if (raop_rtp->dsock != -1) closesocket(raop_rtp->dsock);
513
514 /* Flush buffer into initial state */
515 raop_buffer_flush(raop_rtp->buffer, -1);
516
517 /* Mark thread as joined */
518 MUTEX_LOCK(raop_rtp->run_mutex);
519 raop_rtp->joined = 1;
520 MUTEX_UNLOCK(raop_rtp->run_mutex);
521 }