Add some compatibility defines to dnssd.c.
[deb_shairplay.git] / src / lib / raop_rtp.c
CommitLineData
23e7e3ae
JVH
1/**
2 * Copyright (C) 2011-2012 Juho Vähä-Herttua
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 */
14
2340bcd3
JVH
15#include <stdlib.h>
16#include <stdio.h>
17#include <string.h>
18#include <assert.h>
19#include <errno.h>
20
21#include "raop_rtp.h"
22#include "raop.h"
23#include "raop_buffer.h"
24#include "netutils.h"
ba0970e1 25#include "utils.h"
2340bcd3
JVH
26#include "compat.h"
27#include "logger.h"
28
29#define NO_FLUSH (-42)
30
31struct raop_rtp_s {
32 logger_t *logger;
33 raop_callbacks_t callbacks;
34
ba0970e1 35 /* Buffer to handle all resends */
2340bcd3
JVH
36 raop_buffer_t *buffer;
37
ba0970e1
JVH
38 /* Remote address as sockaddr */
39 struct sockaddr_storage remote_saddr;
40 socklen_t remote_saddr_len;
41
3baaba9d 42 /* MUTEX LOCKED VARIABLES START */
2340bcd3
JVH
43 /* These variables only edited mutex locked */
44 int running;
45 int joined;
3baaba9d 46
2340bcd3 47 float volume;
8e3bf408 48 int volume_changed;
3baaba9d
JVH
49 unsigned char *metadata;
50 int metadata_len;
51 unsigned char *coverart;
52 int coverart_len;
53
2340bcd3
JVH
54 int flush;
55 thread_handle_t thread;
56 mutex_handle_t run_mutex;
3baaba9d 57 /* MUTEX LOCKED VARIABLES END */
2340bcd3
JVH
58
59 /* Remote control and timing ports */
60 unsigned short control_rport;
61 unsigned short timing_rport;
62
63 /* Sockets for control, timing and data */
64 int csock, tsock, dsock;
65
66 /* Local control, timing and data ports */
67 unsigned short control_lport;
68 unsigned short timing_lport;
69 unsigned short data_lport;
70
ba0970e1 71 /* Initialized after the first control packet */
2340bcd3
JVH
72 struct sockaddr_storage control_saddr;
73 socklen_t control_saddr_len;
74 unsigned short control_seqnum;
75};
76
ba0970e1
JVH
77static int
78raop_rtp_parse_remote(raop_rtp_t *raop_rtp, const char *remote)
79{
80 char *original;
81 char *current;
82 char *tmpstr;
83 int family;
84 int ret;
85
86 assert(raop_rtp);
87
88 current = original = strdup(remote);
89 if (!original) {
90 return -1;
91 }
92 tmpstr = utils_strsep(&current, " ");
93 if (strcmp(tmpstr, "IN")) {
94 free(original);
95 return -1;
96 }
97 tmpstr = utils_strsep(&current, " ");
98 if (!strcmp(tmpstr, "IP4") && current) {
99 family = AF_INET;
100 } else if (!strcmp(tmpstr, "IP6") && current) {
101 family = AF_INET6;
102 } else {
103 free(original);
104 return -1;
105 }
e66980f2
JVH
106 if (strstr(current, ":")) {
107 /* FIXME: iTunes sends IP4 even with an IPv6 address, does it mean something */
108 family = AF_INET6;
109 }
ba0970e1
JVH
110 ret = netutils_parse_address(family, current,
111 &raop_rtp->remote_saddr,
112 sizeof(raop_rtp->remote_saddr));
113 if (ret < 0) {
114 free(original);
115 return -1;
116 }
117 raop_rtp->remote_saddr_len = ret;
118 free(original);
119 return 0;
120}
121
2340bcd3 122raop_rtp_t *
ba0970e1 123raop_rtp_init(logger_t *logger, raop_callbacks_t *callbacks, const char *remote,
597bc69b
JVH
124 const char *rtpmap, const char *fmtp,
125 const unsigned char *aeskey, const unsigned char *aesiv)
2340bcd3
JVH
126{
127 raop_rtp_t *raop_rtp;
128
129 assert(logger);
ba0970e1
JVH
130 assert(callbacks);
131 assert(remote);
597bc69b 132 assert(rtpmap);
ba0970e1 133 assert(fmtp);
2340bcd3
JVH
134
135 raop_rtp = calloc(1, sizeof(raop_rtp_t));
136 if (!raop_rtp) {
137 return NULL;
138 }
139 raop_rtp->logger = logger;
140 memcpy(&raop_rtp->callbacks, callbacks, sizeof(raop_callbacks_t));
597bc69b 141 raop_rtp->buffer = raop_buffer_init(rtpmap, fmtp, aeskey, aesiv);
2340bcd3
JVH
142 if (!raop_rtp->buffer) {
143 free(raop_rtp);
144 return NULL;
145 }
ba0970e1
JVH
146 if (raop_rtp_parse_remote(raop_rtp, remote) < 0) {
147 free(raop_rtp);
148 return NULL;
149 }
2340bcd3
JVH
150
151 raop_rtp->running = 0;
152 raop_rtp->joined = 1;
153 raop_rtp->flush = NO_FLUSH;
154 MUTEX_CREATE(raop_rtp->run_mutex);
155
156 return raop_rtp;
157}
158
159void
160raop_rtp_destroy(raop_rtp_t *raop_rtp)
161{
162 if (raop_rtp) {
163 raop_rtp_stop(raop_rtp);
164
165 MUTEX_DESTROY(raop_rtp->run_mutex);
166 raop_buffer_destroy(raop_rtp->buffer);
3baaba9d
JVH
167 free(raop_rtp->metadata);
168 free(raop_rtp->coverart);
2340bcd3
JVH
169 free(raop_rtp);
170 }
171}
172
173static int
174raop_rtp_init_sockets(raop_rtp_t *raop_rtp, int use_ipv6, int use_udp)
175{
176 int csock = -1, tsock = -1, dsock = -1;
177 unsigned short cport = 0, tport = 0, dport = 0;
178
179 assert(raop_rtp);
180
181 if (use_udp) {
182 csock = netutils_init_socket(&cport, use_ipv6, use_udp);
183 tsock = netutils_init_socket(&tport, use_ipv6, use_udp);
184 if (csock == -1 || tsock == -1) {
185 goto sockets_cleanup;
186 }
187 }
188 dsock = netutils_init_socket(&dport, use_ipv6, use_udp);
189 if (dsock == -1) {
190 goto sockets_cleanup;
191 }
192
193 /* Listen to the data socket if using TCP */
194 if (!use_udp) {
195 if (listen(dsock, 1) < 0)
196 goto sockets_cleanup;
197 }
198
199 /* Set socket descriptors */
200 raop_rtp->csock = csock;
201 raop_rtp->tsock = tsock;
202 raop_rtp->dsock = dsock;
203
204 /* Set port values */
205 raop_rtp->control_lport = cport;
206 raop_rtp->timing_lport = tport;
207 raop_rtp->data_lport = dport;
208 return 0;
209
210sockets_cleanup:
211 if (csock != -1) closesocket(csock);
212 if (tsock != -1) closesocket(tsock);
213 if (dsock != -1) closesocket(dsock);
214 return -1;
215}
216
217static int
218raop_rtp_resend_callback(void *opaque, unsigned short seqnum, unsigned short count)
219{
220 raop_rtp_t *raop_rtp = opaque;
221 unsigned char packet[8];
222 unsigned short ourseqnum;
223 struct sockaddr *addr;
224 socklen_t addrlen;
ba0970e1 225 int ret;
2340bcd3
JVH
226
227 addr = (struct sockaddr *)&raop_rtp->control_saddr;
228 addrlen = raop_rtp->control_saddr_len;
229
46212791 230 logger_log(raop_rtp->logger, LOGGER_DEBUG, "Got resend request %d %d", seqnum, count);
2340bcd3
JVH
231 ourseqnum = raop_rtp->control_seqnum++;
232
233 /* Fill the request buffer */
234 packet[0] = 0x80;
235 packet[1] = 0x55|0x80;
236 packet[2] = (ourseqnum >> 8);
237 packet[3] = ourseqnum;
238 packet[4] = (seqnum >> 8);
239 packet[5] = seqnum;
240 packet[6] = (count >> 8);
241 packet[7] = count;
242
ba0970e1
JVH
243 ret = sendto(raop_rtp->csock, (const char *)packet, sizeof(packet), 0, addr, addrlen);
244 if (ret == -1) {
46212791 245 logger_log(raop_rtp->logger, LOGGER_WARNING, "Resend failed: %d", SOCKET_GET_ERROR());
ba0970e1
JVH
246 }
247
2340bcd3
JVH
248 return 0;
249}
250
b4cc5b07 251static int
8e3bf408 252raop_rtp_process_events(raop_rtp_t *raop_rtp, void *cb_data)
b4cc5b07
JVH
253{
254 int flush;
8e3bf408 255 float volume;
b4cc5b07
JVH
256 int volume_changed;
257 unsigned char *metadata;
258 int metadata_len;
259 unsigned char *coverart;
260 int coverart_len;
261
262 assert(raop_rtp);
b4cc5b07
JVH
263
264 MUTEX_LOCK(raop_rtp->run_mutex);
265 if (!raop_rtp->running) {
266 MUTEX_UNLOCK(raop_rtp->run_mutex);
267 return 1;
268 }
269
270 /* Read the volume level */
8e3bf408
JVH
271 volume = raop_rtp->volume;
272 volume_changed = raop_rtp->volume_changed;
273 raop_rtp->volume_changed = 0;
b4cc5b07
JVH
274
275 /* Read the flush value */
276 flush = raop_rtp->flush;
277 raop_rtp->flush = NO_FLUSH;
278
279 /* Read the metadata */
280 metadata = raop_rtp->metadata;
281 metadata_len = raop_rtp->metadata_len;
282 raop_rtp->metadata = NULL;
283 raop_rtp->metadata_len = 0;
284
285 /* Read the coverart */
286 coverart = raop_rtp->coverart;
287 coverart_len = raop_rtp->coverart_len;
288 raop_rtp->coverart = NULL;
289 raop_rtp->coverart_len = 0;
290 MUTEX_UNLOCK(raop_rtp->run_mutex);
291
292 /* Call set_volume callback if changed */
293 if (volume_changed) {
294 if (raop_rtp->callbacks.audio_set_volume) {
8e3bf408 295 raop_rtp->callbacks.audio_set_volume(raop_rtp->callbacks.cls, cb_data, volume);
b4cc5b07
JVH
296 }
297 }
298
299 /* Handle flush if requested */
300 if (flush != NO_FLUSH) {
301 raop_buffer_flush(raop_rtp->buffer, flush);
302 if (raop_rtp->callbacks.audio_flush) {
303 raop_rtp->callbacks.audio_flush(raop_rtp->callbacks.cls, cb_data);
304 }
305 }
306 if (metadata != NULL) {
307 if (raop_rtp->callbacks.audio_set_metadata) {
308 raop_rtp->callbacks.audio_set_metadata(raop_rtp->callbacks.cls, cb_data, metadata, metadata_len);
309 }
310 free(metadata);
311 metadata = NULL;
312 }
313 if (coverart != NULL) {
314 if (raop_rtp->callbacks.audio_set_coverart) {
315 raop_rtp->callbacks.audio_set_coverart(raop_rtp->callbacks.cls, cb_data, coverart, coverart_len);
316 }
317 free(coverart);
318 coverart = NULL;
319 }
320 return 0;
321}
322
2340bcd3
JVH
323static THREAD_RETVAL
324raop_rtp_thread_udp(void *arg)
325{
326 raop_rtp_t *raop_rtp = arg;
327 unsigned char packet[RAOP_PACKET_LEN];
328 unsigned int packetlen;
329 struct sockaddr_storage saddr;
330 socklen_t saddrlen;
331
332 const ALACSpecificConfig *config;
333 void *cb_data = NULL;
334
335 assert(raop_rtp);
336
337 config = raop_buffer_get_config(raop_rtp->buffer);
067f00ef 338 cb_data = raop_rtp->callbacks.audio_init(raop_rtp->callbacks.cls,
2340bcd3
JVH
339 config->bitDepth,
340 config->numChannels,
341 config->sampleRate);
342
343 while(1) {
2340bcd3
JVH
344 fd_set rfds;
345 struct timeval tv;
346 int nfds, ret;
347
b4cc5b07 348 /* Check if we are still running and process callbacks */
8e3bf408 349 if (raop_rtp_process_events(raop_rtp, cb_data)) {
2340bcd3
JVH
350 break;
351 }
2340bcd3
JVH
352
353 /* Set timeout value to 5ms */
354 tv.tv_sec = 0;
355 tv.tv_usec = 5000;
356
357 /* Get the correct nfds value */
358 nfds = raop_rtp->csock+1;
359 if (raop_rtp->tsock >= nfds)
360 nfds = raop_rtp->tsock+1;
361 if (raop_rtp->dsock >= nfds)
362 nfds = raop_rtp->dsock+1;
363
364 /* Set rfds and call select */
365 FD_ZERO(&rfds);
366 FD_SET(raop_rtp->csock, &rfds);
367 FD_SET(raop_rtp->tsock, &rfds);
368 FD_SET(raop_rtp->dsock, &rfds);
369 ret = select(nfds, &rfds, NULL, NULL, &tv);
370 if (ret == 0) {
371 /* Timeout happened */
372 continue;
373 } else if (ret == -1) {
374 /* FIXME: Error happened */
375 break;
376 }
377
378 if (FD_ISSET(raop_rtp->csock, &rfds)) {
379 saddrlen = sizeof(saddr);
380 packetlen = recvfrom(raop_rtp->csock, (char *)packet, sizeof(packet), 0,
381 (struct sockaddr *)&saddr, &saddrlen);
382
ba0970e1 383 /* Get the destination address here, because we need the sin6_scope_id */
2340bcd3
JVH
384 memcpy(&raop_rtp->control_saddr, &saddr, saddrlen);
385 raop_rtp->control_saddr_len = saddrlen;
386
387 if (packetlen >= 12) {
388 char type = packet[1] & ~0x80;
389
46212791 390 logger_log(raop_rtp->logger, LOGGER_DEBUG, "Got control packet of type 0x%02x", type);
2340bcd3
JVH
391 if (type == 0x56) {
392 /* Handle resent data packet */
393 int ret = raop_buffer_queue(raop_rtp->buffer, packet+4, packetlen-4, 1);
394 assert(ret >= 0);
395 }
396 }
397 } else if (FD_ISSET(raop_rtp->tsock, &rfds)) {
46212791 398 logger_log(raop_rtp->logger, LOGGER_INFO, "Would have timing packet in queue");
2340bcd3
JVH
399 } else if (FD_ISSET(raop_rtp->dsock, &rfds)) {
400 saddrlen = sizeof(saddr);
401 packetlen = recvfrom(raop_rtp->dsock, (char *)packet, sizeof(packet), 0,
402 (struct sockaddr *)&saddr, &saddrlen);
403 if (packetlen >= 12) {
404 int no_resend = (raop_rtp->control_rport == 0);
405 int ret;
406
407 const void *audiobuf;
408 int audiobuflen;
409
410 ret = raop_buffer_queue(raop_rtp->buffer, packet, packetlen, 1);
411 assert(ret >= 0);
412
413 /* Decode all frames in queue */
414 while ((audiobuf = raop_buffer_dequeue(raop_rtp->buffer, &audiobuflen, no_resend))) {
415 raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, cb_data, audiobuf, audiobuflen);
416 }
417
418 /* Handle possible resend requests */
419 if (!no_resend) {
420 raop_buffer_handle_resends(raop_rtp->buffer, raop_rtp_resend_callback, raop_rtp);
421 }
422 }
423 }
424 }
46212791 425 logger_log(raop_rtp->logger, LOGGER_INFO, "Exiting UDP RAOP thread");
2340bcd3
JVH
426 raop_rtp->callbacks.audio_destroy(raop_rtp->callbacks.cls, cb_data);
427
428 return 0;
429}
430
431static THREAD_RETVAL
432raop_rtp_thread_tcp(void *arg)
433{
434 raop_rtp_t *raop_rtp = arg;
435 int stream_fd = -1;
436 unsigned char packet[RAOP_PACKET_LEN];
437 unsigned int packetlen = 0;
438
439 const ALACSpecificConfig *config;
440 void *cb_data = NULL;
441
442 assert(raop_rtp);
443
444 config = raop_buffer_get_config(raop_rtp->buffer);
067f00ef 445 cb_data = raop_rtp->callbacks.audio_init(raop_rtp->callbacks.cls,
2340bcd3
JVH
446 config->bitDepth,
447 config->numChannels,
448 config->sampleRate);
449
450 while (1) {
2340bcd3
JVH
451 fd_set rfds;
452 struct timeval tv;
453 int nfds, ret;
454
b4cc5b07 455 /* Check if we are still running and process callbacks */
8e3bf408 456 if (raop_rtp_process_events(raop_rtp, cb_data)) {
2340bcd3
JVH
457 break;
458 }
2340bcd3
JVH
459
460 /* Set timeout value to 5ms */
461 tv.tv_sec = 0;
462 tv.tv_usec = 5000;
463
464 /* Get the correct nfds value and set rfds */
465 FD_ZERO(&rfds);
466 if (stream_fd == -1) {
467 FD_SET(raop_rtp->dsock, &rfds);
468 nfds = raop_rtp->dsock+1;
469 } else {
470 FD_SET(stream_fd, &rfds);
471 nfds = stream_fd+1;
472 }
473 ret = select(nfds, &rfds, NULL, NULL, &tv);
474 if (ret == 0) {
475 /* Timeout happened */
476 continue;
477 } else if (ret == -1) {
478 /* FIXME: Error happened */
46212791 479 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in select");
2340bcd3
JVH
480 break;
481 }
482 if (stream_fd == -1 && FD_ISSET(raop_rtp->dsock, &rfds)) {
483 struct sockaddr_storage saddr;
484 socklen_t saddrlen;
485
46212791 486 logger_log(raop_rtp->logger, LOGGER_INFO, "Accepting client");
2340bcd3
JVH
487 saddrlen = sizeof(saddr);
488 stream_fd = accept(raop_rtp->dsock, (struct sockaddr *)&saddr, &saddrlen);
489 if (stream_fd == -1) {
490 /* FIXME: Error happened */
46212791 491 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in accept %d %s", errno, strerror(errno));
2340bcd3
JVH
492 break;
493 }
494 }
495 if (stream_fd != -1 && FD_ISSET(stream_fd, &rfds)) {
496 unsigned int rtplen=0;
2340bcd3
JVH
497
498 const void *audiobuf;
499 int audiobuflen;
500
501 ret = recv(stream_fd, (char *)(packet+packetlen), sizeof(packet)-packetlen, 0);
502 if (ret == 0) {
503 /* TCP socket closed */
46212791 504 logger_log(raop_rtp->logger, LOGGER_INFO, "TCP socket closed");
2340bcd3
JVH
505 break;
506 } else if (ret == -1) {
507 /* FIXME: Error happened */
46212791 508 logger_log(raop_rtp->logger, LOGGER_INFO, "Error in recv");
2340bcd3
JVH
509 break;
510 }
511 packetlen += ret;
512
513 /* Check that we have enough bytes */
514 if (packetlen < 4) {
515 continue;
516 }
517 if (packet[0] != '$' || packet[1] != '\0') {
518 /* FIXME: Incorrect RTP magic bytes */
519 break;
520 }
521 rtplen = (packet[2] << 8) | packet[3];
522 if (rtplen > sizeof(packet)) {
523 /* FIXME: Too long packet */
46212791 524 logger_log(raop_rtp->logger, LOGGER_INFO, "Error, packet too long %d", rtplen);
2340bcd3
JVH
525 break;
526 }
527 if (packetlen < 4+rtplen) {
528 continue;
529 }
530
531 /* Packet is valid, process it */
2340bcd3
JVH
532 ret = raop_buffer_queue(raop_rtp->buffer, packet+4, rtplen, 0);
533 assert(ret >= 0);
534
535 /* Remove processed bytes from packet buffer */
536 memmove(packet, packet+4+rtplen, packetlen-rtplen);
537 packetlen -= 4+rtplen;
538
539 /* Decode the received frame */
540 if ((audiobuf = raop_buffer_dequeue(raop_rtp->buffer, &audiobuflen, 1))) {
541 raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, cb_data, audiobuf, audiobuflen);
542 }
543 }
544 }
545
546 /* Close the stream file descriptor */
547 if (stream_fd != -1) {
548 closesocket(stream_fd);
549 }
550
46212791 551 logger_log(raop_rtp->logger, LOGGER_INFO, "Exiting TCP RAOP thread");
2340bcd3
JVH
552 raop_rtp->callbacks.audio_destroy(raop_rtp->callbacks.cls, cb_data);
553
554 return 0;
555}
556
557void
558raop_rtp_start(raop_rtp_t *raop_rtp, int use_udp, unsigned short control_rport, unsigned short timing_rport,
559 unsigned short *control_lport, unsigned short *timing_lport, unsigned short *data_lport)
560{
ba0970e1
JVH
561 int use_ipv6 = 0;
562
2340bcd3
JVH
563 assert(raop_rtp);
564
565 MUTEX_LOCK(raop_rtp->run_mutex);
566 if (raop_rtp->running || !raop_rtp->joined) {
567 MUTEX_UNLOCK(raop_rtp->run_mutex);
568 return;
569 }
570
571 /* Initialize ports and sockets */
572 raop_rtp->control_rport = control_rport;
573 raop_rtp->timing_rport = timing_rport;
ba0970e1
JVH
574 if (raop_rtp->remote_saddr.ss_family == AF_INET6) {
575 use_ipv6 = 1;
576 }
577 if (raop_rtp_init_sockets(raop_rtp, use_ipv6, use_udp) < 0) {
46212791 578 logger_log(raop_rtp->logger, LOGGER_INFO, "Initializing sockets failed");
2340bcd3
JVH
579 MUTEX_UNLOCK(raop_rtp->run_mutex);
580 return;
581 }
582 if (control_lport) *control_lport = raop_rtp->control_lport;
583 if (timing_lport) *timing_lport = raop_rtp->timing_lport;
584 if (data_lport) *data_lport = raop_rtp->data_lport;
585
586 /* Create the thread and initialize running values */
587 raop_rtp->running = 1;
588 raop_rtp->joined = 0;
589 if (use_udp) {
590 THREAD_CREATE(raop_rtp->thread, raop_rtp_thread_udp, raop_rtp);
591 } else {
592 THREAD_CREATE(raop_rtp->thread, raop_rtp_thread_tcp, raop_rtp);
593 }
594 MUTEX_UNLOCK(raop_rtp->run_mutex);
595}
596
597void
598raop_rtp_set_volume(raop_rtp_t *raop_rtp, float volume)
599{
600 assert(raop_rtp);
601
602 if (volume > 0.0f) {
603 volume = 0.0f;
604 } else if (volume < -144.0f) {
605 volume = -144.0f;
606 }
607
608 /* Set volume in thread instead */
609 MUTEX_LOCK(raop_rtp->run_mutex);
610 raop_rtp->volume = volume;
8e3bf408 611 raop_rtp->volume_changed = 1;
2340bcd3 612 MUTEX_UNLOCK(raop_rtp->run_mutex);
3baaba9d
JVH
613}
614
615void
616raop_rtp_set_metadata(raop_rtp_t *raop_rtp, const char *data, int datalen)
617{
618 unsigned char *metadata;
619
620 assert(raop_rtp);
621
622 if (datalen <= 0) {
623 return;
624 }
625 metadata = malloc(datalen);
626 assert(metadata);
627 memcpy(metadata, data, datalen);
628
629 /* Set metadata in thread instead */
630 MUTEX_LOCK(raop_rtp->run_mutex);
631 raop_rtp->metadata = metadata;
632 raop_rtp->metadata_len = datalen;
633 MUTEX_UNLOCK(raop_rtp->run_mutex);
634}
635
636void
637raop_rtp_set_coverart(raop_rtp_t *raop_rtp, const char *data, int datalen)
638{
639 unsigned char *coverart;
640
641 assert(raop_rtp);
642
643 if (datalen <= 0) {
644 return;
645 }
646 coverart = malloc(datalen);
647 assert(coverart);
648 memcpy(coverart, data, datalen);
649
650 /* Set coverart in thread instead */
651 MUTEX_LOCK(raop_rtp->run_mutex);
652 raop_rtp->coverart = coverart;
653 raop_rtp->coverart_len = datalen;
654 MUTEX_UNLOCK(raop_rtp->run_mutex);
2340bcd3
JVH
655}
656
657void
658raop_rtp_flush(raop_rtp_t *raop_rtp, int next_seq)
659{
660 assert(raop_rtp);
661
662 /* Call flush in thread instead */
663 MUTEX_LOCK(raop_rtp->run_mutex);
664 raop_rtp->flush = next_seq;
665 MUTEX_UNLOCK(raop_rtp->run_mutex);
666}
667
668void
669raop_rtp_stop(raop_rtp_t *raop_rtp)
670{
671 assert(raop_rtp);
672
673 /* Check that we are running and thread is not
674 * joined (should never be while still running) */
675 MUTEX_LOCK(raop_rtp->run_mutex);
676 if (!raop_rtp->running || raop_rtp->joined) {
677 MUTEX_UNLOCK(raop_rtp->run_mutex);
678 return;
679 }
680 raop_rtp->running = 0;
681 MUTEX_UNLOCK(raop_rtp->run_mutex);
682
683 /* Join the thread */
684 THREAD_JOIN(raop_rtp->thread);
685 if (raop_rtp->csock != -1) closesocket(raop_rtp->csock);
686 if (raop_rtp->tsock != -1) closesocket(raop_rtp->tsock);
687 if (raop_rtp->dsock != -1) closesocket(raop_rtp->dsock);
688
689 /* Flush buffer into initial state */
690 raop_buffer_flush(raop_rtp->buffer, -1);
691
692 /* Mark thread as joined */
693 MUTEX_LOCK(raop_rtp->run_mutex);
694 raop_rtp->joined = 1;
695 MUTEX_UNLOCK(raop_rtp->run_mutex);
696}