Do not accept new connections if the connection buffer is full.
[deb_shairplay.git] / src / lib / raop_rtp.c
CommitLineData
23e7e3ae
JVH
1/**
2 * Copyright (C) 2011-2012 Juho Vähä-Herttua
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 */
14
2340bcd3
JVH
15#include <stdlib.h>
16#include <stdio.h>
17#include <string.h>
18#include <assert.h>
19#include <errno.h>
20
21#include "raop_rtp.h"
22#include "raop.h"
23#include "raop_buffer.h"
24#include "netutils.h"
25#include "compat.h"
26#include "logger.h"
27
28#define NO_FLUSH (-42)
29
30struct raop_rtp_s {
31 logger_t *logger;
32 raop_callbacks_t callbacks;
33
34 raop_buffer_t *buffer;
35
36 /* These variables only edited mutex locked */
37 int running;
38 int joined;
39 float volume;
40 int flush;
41 thread_handle_t thread;
42 mutex_handle_t run_mutex;
43
44 /* Remote control and timing ports */
45 unsigned short control_rport;
46 unsigned short timing_rport;
47
48 /* Sockets for control, timing and data */
49 int csock, tsock, dsock;
50
51 /* Local control, timing and data ports */
52 unsigned short control_lport;
53 unsigned short timing_lport;
54 unsigned short data_lport;
55
56 struct sockaddr_storage control_saddr;
57 socklen_t control_saddr_len;
58 unsigned short control_seqnum;
59};
60
61raop_rtp_t *
62raop_rtp_init(logger_t *logger, raop_callbacks_t *callbacks, const char *fmtp,
63 const unsigned char *aeskey, const unsigned char *aesiv)
64{
65 raop_rtp_t *raop_rtp;
66
67 assert(logger);
68
69 raop_rtp = calloc(1, sizeof(raop_rtp_t));
70 if (!raop_rtp) {
71 return NULL;
72 }
73 raop_rtp->logger = logger;
74 memcpy(&raop_rtp->callbacks, callbacks, sizeof(raop_callbacks_t));
75 raop_rtp->buffer = raop_buffer_init(fmtp, aeskey, aesiv);
76 if (!raop_rtp->buffer) {
77 free(raop_rtp);
78 return NULL;
79 }
80
81 raop_rtp->running = 0;
82 raop_rtp->joined = 1;
83 raop_rtp->flush = NO_FLUSH;
84 MUTEX_CREATE(raop_rtp->run_mutex);
85
86 return raop_rtp;
87}
88
89void
90raop_rtp_destroy(raop_rtp_t *raop_rtp)
91{
92 if (raop_rtp) {
93 raop_rtp_stop(raop_rtp);
94
95 MUTEX_DESTROY(raop_rtp->run_mutex);
96 raop_buffer_destroy(raop_rtp->buffer);
97 free(raop_rtp);
98 }
99}
100
101static int
102raop_rtp_init_sockets(raop_rtp_t *raop_rtp, int use_ipv6, int use_udp)
103{
104 int csock = -1, tsock = -1, dsock = -1;
105 unsigned short cport = 0, tport = 0, dport = 0;
106
107 assert(raop_rtp);
108
109 if (use_udp) {
110 csock = netutils_init_socket(&cport, use_ipv6, use_udp);
111 tsock = netutils_init_socket(&tport, use_ipv6, use_udp);
112 if (csock == -1 || tsock == -1) {
113 goto sockets_cleanup;
114 }
115 }
116 dsock = netutils_init_socket(&dport, use_ipv6, use_udp);
117 if (dsock == -1) {
118 goto sockets_cleanup;
119 }
120
121 /* Listen to the data socket if using TCP */
122 if (!use_udp) {
123 if (listen(dsock, 1) < 0)
124 goto sockets_cleanup;
125 }
126
127 /* Set socket descriptors */
128 raop_rtp->csock = csock;
129 raop_rtp->tsock = tsock;
130 raop_rtp->dsock = dsock;
131
132 /* Set port values */
133 raop_rtp->control_lport = cport;
134 raop_rtp->timing_lport = tport;
135 raop_rtp->data_lport = dport;
136 return 0;
137
138sockets_cleanup:
139 if (csock != -1) closesocket(csock);
140 if (tsock != -1) closesocket(tsock);
141 if (dsock != -1) closesocket(dsock);
142 return -1;
143}
144
145static int
146raop_rtp_resend_callback(void *opaque, unsigned short seqnum, unsigned short count)
147{
148 raop_rtp_t *raop_rtp = opaque;
149 unsigned char packet[8];
150 unsigned short ourseqnum;
151 struct sockaddr *addr;
152 socklen_t addrlen;
153
154 addr = (struct sockaddr *)&raop_rtp->control_saddr;
155 addrlen = raop_rtp->control_saddr_len;
156
14003408 157 logger_log(raop_rtp->logger, LOGGER_DEBUG, "Got resend request %d %d\n", seqnum, count);
2340bcd3
JVH
158 ourseqnum = raop_rtp->control_seqnum++;
159
160 /* Fill the request buffer */
161 packet[0] = 0x80;
162 packet[1] = 0x55|0x80;
163 packet[2] = (ourseqnum >> 8);
164 packet[3] = ourseqnum;
165 packet[4] = (seqnum >> 8);
166 packet[5] = seqnum;
167 packet[6] = (count >> 8);
168 packet[7] = count;
169
170 sendto(raop_rtp->csock, (const char *)packet, sizeof(packet), 0, addr, addrlen);
171 return 0;
172}
173
174static THREAD_RETVAL
175raop_rtp_thread_udp(void *arg)
176{
177 raop_rtp_t *raop_rtp = arg;
178 unsigned char packet[RAOP_PACKET_LEN];
179 unsigned int packetlen;
180 struct sockaddr_storage saddr;
181 socklen_t saddrlen;
182
183 const ALACSpecificConfig *config;
184 void *cb_data = NULL;
185
186 assert(raop_rtp);
187
188 config = raop_buffer_get_config(raop_rtp->buffer);
189 raop_rtp->callbacks.audio_init(raop_rtp->callbacks.cls, &cb_data,
190 config->bitDepth,
191 config->numChannels,
192 config->sampleRate);
193
194 while(1) {
195 int volume_changed;
1b4a582b 196 float volume = 0.0;
2340bcd3
JVH
197 int flush;
198
199 fd_set rfds;
200 struct timeval tv;
201 int nfds, ret;
202
203 MUTEX_LOCK(raop_rtp->run_mutex);
204 if (!raop_rtp->running) {
205 MUTEX_UNLOCK(raop_rtp->run_mutex);
206 break;
207 }
208 /* Read the volume level */
209 volume_changed = (volume != raop_rtp->volume);
210 volume = raop_rtp->volume;
211
212 /* Read the flush value */
213 flush = raop_rtp->flush;
214 raop_rtp->flush = NO_FLUSH;
215 MUTEX_UNLOCK(raop_rtp->run_mutex);
216
217 /* Call set_volume callback if changed */
218 if (volume_changed) {
219 raop_rtp->callbacks.audio_set_volume(raop_rtp->callbacks.cls, cb_data, volume);
220 }
221 if (flush != NO_FLUSH) {
222 raop_buffer_flush(raop_rtp->buffer, flush);
223 raop_rtp->callbacks.audio_flush(raop_rtp->callbacks.cls, cb_data);
224 }
225
226 /* Set timeout value to 5ms */
227 tv.tv_sec = 0;
228 tv.tv_usec = 5000;
229
230 /* Get the correct nfds value */
231 nfds = raop_rtp->csock+1;
232 if (raop_rtp->tsock >= nfds)
233 nfds = raop_rtp->tsock+1;
234 if (raop_rtp->dsock >= nfds)
235 nfds = raop_rtp->dsock+1;
236
237 /* Set rfds and call select */
238 FD_ZERO(&rfds);
239 FD_SET(raop_rtp->csock, &rfds);
240 FD_SET(raop_rtp->tsock, &rfds);
241 FD_SET(raop_rtp->dsock, &rfds);
242 ret = select(nfds, &rfds, NULL, NULL, &tv);
243 if (ret == 0) {
244 /* Timeout happened */
245 continue;
246 } else if (ret == -1) {
247 /* FIXME: Error happened */
248 break;
249 }
250
251 if (FD_ISSET(raop_rtp->csock, &rfds)) {
252 saddrlen = sizeof(saddr);
253 packetlen = recvfrom(raop_rtp->csock, (char *)packet, sizeof(packet), 0,
254 (struct sockaddr *)&saddr, &saddrlen);
255
256 /* FIXME: Get destination address here */
257 memcpy(&raop_rtp->control_saddr, &saddr, saddrlen);
258 raop_rtp->control_saddr_len = saddrlen;
259
260 if (packetlen >= 12) {
261 char type = packet[1] & ~0x80;
262
14003408 263 logger_log(raop_rtp->logger, LOGGER_DEBUG, "Got control packet of type 0x%02x\n", type);
2340bcd3
JVH
264 if (type == 0x56) {
265 /* Handle resent data packet */
266 int ret = raop_buffer_queue(raop_rtp->buffer, packet+4, packetlen-4, 1);
267 assert(ret >= 0);
268 }
269 }
270 } else if (FD_ISSET(raop_rtp->tsock, &rfds)) {
271 logger_log(raop_rtp->logger, LOGGER_INFO, "Would have timing packet in queue\n");
272 } else if (FD_ISSET(raop_rtp->dsock, &rfds)) {
273 saddrlen = sizeof(saddr);
274 packetlen = recvfrom(raop_rtp->dsock, (char *)packet, sizeof(packet), 0,
275 (struct sockaddr *)&saddr, &saddrlen);
276 if (packetlen >= 12) {
277 int no_resend = (raop_rtp->control_rport == 0);
278 int ret;
279
280 const void *audiobuf;
281 int audiobuflen;
282
283 ret = raop_buffer_queue(raop_rtp->buffer, packet, packetlen, 1);
284 assert(ret >= 0);
285
286 /* Decode all frames in queue */
287 while ((audiobuf = raop_buffer_dequeue(raop_rtp->buffer, &audiobuflen, no_resend))) {
288 raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, cb_data, audiobuf, audiobuflen);
289 }
290
291 /* Handle possible resend requests */
292 if (!no_resend) {
293 raop_buffer_handle_resends(raop_rtp->buffer, raop_rtp_resend_callback, raop_rtp);
294 }
295 }
296 }
297 }
298 logger_log(raop_rtp->logger, LOGGER_INFO, "Exiting thread\n");
299 raop_rtp->callbacks.audio_destroy(raop_rtp->callbacks.cls, cb_data);
300
301 return 0;
302}
303
304static THREAD_RETVAL
305raop_rtp_thread_tcp(void *arg)
306{
307 raop_rtp_t *raop_rtp = arg;
308 int stream_fd = -1;
309 unsigned char packet[RAOP_PACKET_LEN];
310 unsigned int packetlen = 0;
311
312 const ALACSpecificConfig *config;
313 void *cb_data = NULL;
314
315 assert(raop_rtp);
316
317 config = raop_buffer_get_config(raop_rtp->buffer);
318 raop_rtp->callbacks.audio_init(raop_rtp->callbacks.cls, &cb_data,
319 config->bitDepth,
320 config->numChannels,
321 config->sampleRate);
322
323 while (1) {
324 int volume_changed;
1b4a582b 325 float volume = 0.0;
2340bcd3
JVH
326
327 fd_set rfds;
328 struct timeval tv;
329 int nfds, ret;
330
331 MUTEX_LOCK(raop_rtp->run_mutex);
332 if (!raop_rtp->running) {
333 MUTEX_UNLOCK(raop_rtp->run_mutex);
334 break;
335 }
336 volume_changed = (volume != raop_rtp->volume);
337 volume = raop_rtp->volume;
338 MUTEX_UNLOCK(raop_rtp->run_mutex);
339
340 /* Call set_volume callback if changed */
341 if (volume_changed) {
342 raop_rtp->callbacks.audio_set_volume(raop_rtp->callbacks.cls, cb_data, volume);
343 }
344
345 /* Set timeout value to 5ms */
346 tv.tv_sec = 0;
347 tv.tv_usec = 5000;
348
349 /* Get the correct nfds value and set rfds */
350 FD_ZERO(&rfds);
351 if (stream_fd == -1) {
352 FD_SET(raop_rtp->dsock, &rfds);
353 nfds = raop_rtp->dsock+1;
354 } else {
355 FD_SET(stream_fd, &rfds);
356 nfds = stream_fd+1;
357 }
358 ret = select(nfds, &rfds, NULL, NULL, &tv);
359 if (ret == 0) {
360 /* Timeout happened */
361 continue;
362 } else if (ret == -1) {
363 /* FIXME: Error happened */
364 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in select\n");
365 break;
366 }
367 if (stream_fd == -1 && FD_ISSET(raop_rtp->dsock, &rfds)) {
368 struct sockaddr_storage saddr;
369 socklen_t saddrlen;
370
371 logger_log(raop_rtp->logger, LOGGER_INFO, "Accepting client\n");
372 saddrlen = sizeof(saddr);
373 stream_fd = accept(raop_rtp->dsock, (struct sockaddr *)&saddr, &saddrlen);
374 if (stream_fd == -1) {
375 /* FIXME: Error happened */
376 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in accept %d %s\n", errno, strerror(errno));
377 break;
378 }
379 }
380 if (stream_fd != -1 && FD_ISSET(stream_fd, &rfds)) {
381 unsigned int rtplen=0;
382 char type;
383
384 const void *audiobuf;
385 int audiobuflen;
386
387 ret = recv(stream_fd, (char *)(packet+packetlen), sizeof(packet)-packetlen, 0);
388 if (ret == 0) {
389 /* TCP socket closed */
390 logger_log(raop_rtp->logger, LOGGER_INFO, "TCP socket closed\n");
391 break;
392 } else if (ret == -1) {
393 /* FIXME: Error happened */
394 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in recv\n");
395 break;
396 }
397 packetlen += ret;
398
399 /* Check that we have enough bytes */
400 if (packetlen < 4) {
401 continue;
402 }
403 if (packet[0] != '$' || packet[1] != '\0') {
404 /* FIXME: Incorrect RTP magic bytes */
405 break;
406 }
407 rtplen = (packet[2] << 8) | packet[3];
408 if (rtplen > sizeof(packet)) {
409 /* FIXME: Too long packet */
410 logger_log(raop_rtp->logger, LOGGER_INFO, "Error, packet too long %d\n", rtplen);
411 break;
412 }
413 if (packetlen < 4+rtplen) {
414 continue;
415 }
416
417 /* Packet is valid, process it */
418 type = packet[4+1] & ~0x80;
419 ret = raop_buffer_queue(raop_rtp->buffer, packet+4, rtplen, 0);
420 assert(ret >= 0);
421
422 /* Remove processed bytes from packet buffer */
423 memmove(packet, packet+4+rtplen, packetlen-rtplen);
424 packetlen -= 4+rtplen;
425
426 /* Decode the received frame */
427 if ((audiobuf = raop_buffer_dequeue(raop_rtp->buffer, &audiobuflen, 1))) {
428 raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, cb_data, audiobuf, audiobuflen);
429 }
430 }
431 }
432
433 /* Close the stream file descriptor */
434 if (stream_fd != -1) {
435 closesocket(stream_fd);
436 }
437
438 logger_log(raop_rtp->logger, LOGGER_INFO, "Exiting thread\n");
439 raop_rtp->callbacks.audio_destroy(raop_rtp->callbacks.cls, cb_data);
440
441 return 0;
442}
443
444void
445raop_rtp_start(raop_rtp_t *raop_rtp, int use_udp, unsigned short control_rport, unsigned short timing_rport,
446 unsigned short *control_lport, unsigned short *timing_lport, unsigned short *data_lport)
447{
448 assert(raop_rtp);
449
450 MUTEX_LOCK(raop_rtp->run_mutex);
451 if (raop_rtp->running || !raop_rtp->joined) {
452 MUTEX_UNLOCK(raop_rtp->run_mutex);
453 return;
454 }
455
456 /* Initialize ports and sockets */
457 raop_rtp->control_rport = control_rport;
458 raop_rtp->timing_rport = timing_rport;
459 if (raop_rtp_init_sockets(raop_rtp, 1, use_udp) < 0) {
460 logger_log(raop_rtp->logger, LOGGER_INFO, "Initializing sockets failed\n");
461 MUTEX_UNLOCK(raop_rtp->run_mutex);
462 return;
463 }
464 if (control_lport) *control_lport = raop_rtp->control_lport;
465 if (timing_lport) *timing_lport = raop_rtp->timing_lport;
466 if (data_lport) *data_lport = raop_rtp->data_lport;
467
468 /* Create the thread and initialize running values */
469 raop_rtp->running = 1;
470 raop_rtp->joined = 0;
471 if (use_udp) {
472 THREAD_CREATE(raop_rtp->thread, raop_rtp_thread_udp, raop_rtp);
473 } else {
474 THREAD_CREATE(raop_rtp->thread, raop_rtp_thread_tcp, raop_rtp);
475 }
476 MUTEX_UNLOCK(raop_rtp->run_mutex);
477}
478
479void
480raop_rtp_set_volume(raop_rtp_t *raop_rtp, float volume)
481{
482 assert(raop_rtp);
483
484 if (volume > 0.0f) {
485 volume = 0.0f;
486 } else if (volume < -144.0f) {
487 volume = -144.0f;
488 }
489
490 /* Set volume in thread instead */
491 MUTEX_LOCK(raop_rtp->run_mutex);
492 raop_rtp->volume = volume;
493 MUTEX_UNLOCK(raop_rtp->run_mutex);
494}
495
496void
497raop_rtp_flush(raop_rtp_t *raop_rtp, int next_seq)
498{
499 assert(raop_rtp);
500
501 /* Call flush in thread instead */
502 MUTEX_LOCK(raop_rtp->run_mutex);
503 raop_rtp->flush = next_seq;
504 MUTEX_UNLOCK(raop_rtp->run_mutex);
505}
506
507void
508raop_rtp_stop(raop_rtp_t *raop_rtp)
509{
510 assert(raop_rtp);
511
512 /* Check that we are running and thread is not
513 * joined (should never be while still running) */
514 MUTEX_LOCK(raop_rtp->run_mutex);
515 if (!raop_rtp->running || raop_rtp->joined) {
516 MUTEX_UNLOCK(raop_rtp->run_mutex);
517 return;
518 }
519 raop_rtp->running = 0;
520 MUTEX_UNLOCK(raop_rtp->run_mutex);
521
522 /* Join the thread */
523 THREAD_JOIN(raop_rtp->thread);
524 if (raop_rtp->csock != -1) closesocket(raop_rtp->csock);
525 if (raop_rtp->tsock != -1) closesocket(raop_rtp->tsock);
526 if (raop_rtp->dsock != -1) closesocket(raop_rtp->dsock);
527
528 /* Flush buffer into initial state */
529 raop_buffer_flush(raop_rtp->buffer, -1);
530
531 /* Mark thread as joined */
532 MUTEX_LOCK(raop_rtp->run_mutex);
533 raop_rtp->joined = 1;
534 MUTEX_UNLOCK(raop_rtp->run_mutex);
535}