2 * UDP prototype streaming system
3 * Copyright (c) 2000, 2001, 2002 Fabrice Bellard
5 * This file is part of FFmpeg.
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
27 #define _BSD_SOURCE /* Needed for using struct ip_mreq with recent glibc */
30 #include "avio_internal.h"
31 #include "libavutil/parseutils.h"
32 #include "libavutil/fifo.h"
33 #include "libavutil/intreadwrite.h"
34 #include "libavutil/avstring.h"
35 #include "libavutil/opt.h"
36 #include "libavutil/log.h"
37 #include "libavutil/time.h"
40 #include "os_support.h"
43 #if HAVE_PTHREAD_CANCEL
47 #ifndef HAVE_PTHREAD_CANCEL
48 #define HAVE_PTHREAD_CANCEL 0
51 #ifndef IPV6_ADD_MEMBERSHIP
52 #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
53 #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
56 #define UDP_TX_BUF_SIZE 32768
57 #define UDP_MAX_PKT_SIZE 65536
69 struct sockaddr_storage dest_addr
;
73 /* Circular Buffer variables for use in UDP receive code */
74 int circular_buffer_size
;
76 int circular_buffer_error
;
77 #if HAVE_PTHREAD_CANCEL
78 pthread_t circular_buffer_thread
;
79 pthread_mutex_t mutex
;
83 uint8_t tmp
[UDP_MAX_PKT_SIZE
+4];
88 struct sockaddr_storage local_addr_storage
;
91 #define OFFSET(x) offsetof(UDPContext, x)
92 #define D AV_OPT_FLAG_DECODING_PARAM
93 #define E AV_OPT_FLAG_ENCODING_PARAM
94 static const AVOption options
[] = {
95 {"buffer_size", "set packet buffer size in bytes", OFFSET(buffer_size
), AV_OPT_TYPE_INT
, {.i64
= 0}, 0, INT_MAX
, D
|E
},
96 {"localport", "set local port to bind to", OFFSET(local_port
), AV_OPT_TYPE_INT
, {.i64
= 0}, 0, INT_MAX
, D
|E
},
97 {"localaddr", "choose local IP address", OFFSET(local_addr
), AV_OPT_TYPE_STRING
, {.str
= ""}, 0, 0, D
|E
},
98 {"pkt_size", "set size of UDP packets", OFFSET(packet_size
), AV_OPT_TYPE_INT
, {.i64
= 1472}, 0, INT_MAX
, D
|E
},
99 {"reuse", "explicitly allow or disallow reusing UDP sockets", OFFSET(reuse_socket
), AV_OPT_TYPE_INT
, {.i64
= 0}, 0, 1, D
|E
},
100 {"broadcast", "explicitly allow or disallow broadcast destination", OFFSET(is_broadcast
), AV_OPT_TYPE_INT
, {.i64
= 0}, 0, 1, E
},
101 {"ttl", "set the time to live value (for multicast only)", OFFSET(ttl
), AV_OPT_TYPE_INT
, {.i64
= 16}, 0, INT_MAX
, E
},
102 {"connect", "set if connect() should be called on socket", OFFSET(is_connected
), AV_OPT_TYPE_INT
, {.i64
= 0}, 0, 1, D
|E
},
103 /* TODO 'sources', 'block' option */
104 {"fifo_size", "set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size
), AV_OPT_TYPE_INT
, {.i64
= 7*4096}, 0, INT_MAX
, D
},
105 {"overrun_nonfatal", "survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal
), AV_OPT_TYPE_INT
, {.i64
= 0}, 0, 1, D
},
106 {"timeout", "set raise error timeout (only in read mode)", OFFSET(timeout
), AV_OPT_TYPE_INT
, {.i64
= 0}, 0, INT_MAX
, D
},
110 static const AVClass udp_context_class
= {
112 .item_name
= av_default_item_name
,
114 .version
= LIBAVUTIL_VERSION_INT
,
117 static void log_net_error(void *ctx
, int level
, const char* prefix
)
120 av_strerror(ff_neterrno(), errbuf
, sizeof(errbuf
));
121 av_log(ctx
, level
, "%s: %s\n", prefix
, errbuf
);
124 static int udp_set_multicast_ttl(int sockfd
, int mcastTTL
,
125 struct sockaddr
*addr
)
127 #ifdef IP_MULTICAST_TTL
128 if (addr
->sa_family
== AF_INET
) {
129 if (setsockopt(sockfd
, IPPROTO_IP
, IP_MULTICAST_TTL
, &mcastTTL
, sizeof(mcastTTL
)) < 0) {
130 log_net_error(NULL
, AV_LOG_ERROR
, "setsockopt(IP_MULTICAST_TTL)");
135 #if defined(IPPROTO_IPV6) && defined(IPV6_MULTICAST_HOPS)
136 if (addr
->sa_family
== AF_INET6
) {
137 if (setsockopt(sockfd
, IPPROTO_IPV6
, IPV6_MULTICAST_HOPS
, &mcastTTL
, sizeof(mcastTTL
)) < 0) {
138 log_net_error(NULL
, AV_LOG_ERROR
, "setsockopt(IPV6_MULTICAST_HOPS)");
146 static int udp_join_multicast_group(int sockfd
, struct sockaddr
*addr
,struct sockaddr
*local_addr
)
148 #ifdef IP_ADD_MEMBERSHIP
149 if (addr
->sa_family
== AF_INET
) {
152 mreq
.imr_multiaddr
.s_addr
= ((struct sockaddr_in
*)addr
)->sin_addr
.s_addr
;
154 mreq
.imr_interface
= ((struct sockaddr_in
*)local_addr
)->sin_addr
;
156 mreq
.imr_interface
.s_addr
= INADDR_ANY
;
157 if (setsockopt(sockfd
, IPPROTO_IP
, IP_ADD_MEMBERSHIP
, (const void *)&mreq
, sizeof(mreq
)) < 0) {
158 log_net_error(NULL
, AV_LOG_ERROR
, "setsockopt(IP_ADD_MEMBERSHIP)");
163 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
164 if (addr
->sa_family
== AF_INET6
) {
165 struct ipv6_mreq mreq6
;
167 memcpy(&mreq6
.ipv6mr_multiaddr
, &(((struct sockaddr_in6
*)addr
)->sin6_addr
), sizeof(struct in6_addr
));
168 mreq6
.ipv6mr_interface
= 0;
169 if (setsockopt(sockfd
, IPPROTO_IPV6
, IPV6_ADD_MEMBERSHIP
, &mreq6
, sizeof(mreq6
)) < 0) {
170 log_net_error(NULL
, AV_LOG_ERROR
, "setsockopt(IPV6_ADD_MEMBERSHIP)");
178 static int udp_leave_multicast_group(int sockfd
, struct sockaddr
*addr
,struct sockaddr
*local_addr
)
180 #ifdef IP_DROP_MEMBERSHIP
181 if (addr
->sa_family
== AF_INET
) {
184 mreq
.imr_multiaddr
.s_addr
= ((struct sockaddr_in
*)addr
)->sin_addr
.s_addr
;
186 mreq
.imr_interface
= ((struct sockaddr_in
*)local_addr
)->sin_addr
;
188 mreq
.imr_interface
.s_addr
= INADDR_ANY
;
189 if (setsockopt(sockfd
, IPPROTO_IP
, IP_DROP_MEMBERSHIP
, (const void *)&mreq
, sizeof(mreq
)) < 0) {
190 log_net_error(NULL
, AV_LOG_ERROR
, "setsockopt(IP_DROP_MEMBERSHIP)");
195 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
196 if (addr
->sa_family
== AF_INET6
) {
197 struct ipv6_mreq mreq6
;
199 memcpy(&mreq6
.ipv6mr_multiaddr
, &(((struct sockaddr_in6
*)addr
)->sin6_addr
), sizeof(struct in6_addr
));
200 mreq6
.ipv6mr_interface
= 0;
201 if (setsockopt(sockfd
, IPPROTO_IPV6
, IPV6_DROP_MEMBERSHIP
, &mreq6
, sizeof(mreq6
)) < 0) {
202 log_net_error(NULL
, AV_LOG_ERROR
, "setsockopt(IPV6_DROP_MEMBERSHIP)");
210 static struct addrinfo
* udp_resolve_host(const char *hostname
, int port
,
211 int type
, int family
, int flags
)
213 struct addrinfo hints
= { 0 }, *res
= 0;
216 const char *node
= 0, *service
= "0";
219 snprintf(sport
, sizeof(sport
), "%d", port
);
222 if ((hostname
) && (hostname
[0] != '\0') && (hostname
[0] != '?')) {
225 hints
.ai_socktype
= type
;
226 hints
.ai_family
= family
;
227 hints
.ai_flags
= flags
;
228 if ((error
= getaddrinfo(node
, service
, &hints
, &res
))) {
230 av_log(NULL
, AV_LOG_ERROR
, "udp_resolve_host: %s\n", gai_strerror(error
));
236 static int udp_set_multicast_sources(int sockfd
, struct sockaddr
*addr
,
237 int addr_len
, char **sources
,
238 int nb_sources
, int include
)
240 #if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE) && !defined(_WIN32)
241 /* These ones are available in the microsoft SDK, but don't seem to work
242 * as on linux, so just prefer the v4-only approach there for now. */
244 for (i
= 0; i
< nb_sources
; i
++) {
245 struct group_source_req mreqs
;
246 int level
= addr
->sa_family
== AF_INET
? IPPROTO_IP
: IPPROTO_IPV6
;
247 struct addrinfo
*sourceaddr
= udp_resolve_host(sources
[i
], 0,
248 SOCK_DGRAM
, AF_UNSPEC
,
251 return AVERROR(ENOENT
);
253 mreqs
.gsr_interface
= 0;
254 memcpy(&mreqs
.gsr_group
, addr
, addr_len
);
255 memcpy(&mreqs
.gsr_source
, sourceaddr
->ai_addr
, sourceaddr
->ai_addrlen
);
256 freeaddrinfo(sourceaddr
);
258 if (setsockopt(sockfd
, level
,
259 include
? MCAST_JOIN_SOURCE_GROUP
: MCAST_BLOCK_SOURCE
,
260 (const void *)&mreqs
, sizeof(mreqs
)) < 0) {
262 log_net_error(NULL
, AV_LOG_ERROR
, "setsockopt(MCAST_JOIN_SOURCE_GROUP)");
264 log_net_error(NULL
, AV_LOG_ERROR
, "setsockopt(MCAST_BLOCK_SOURCE)");
265 return ff_neterrno();
268 #elif HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE)
270 if (addr
->sa_family
!= AF_INET
) {
271 av_log(NULL
, AV_LOG_ERROR
,
272 "Setting multicast sources only supported for IPv4\n");
273 return AVERROR(EINVAL
);
275 for (i
= 0; i
< nb_sources
; i
++) {
276 struct ip_mreq_source mreqs
;
277 struct addrinfo
*sourceaddr
= udp_resolve_host(sources
[i
], 0,
278 SOCK_DGRAM
, AF_UNSPEC
,
281 return AVERROR(ENOENT
);
282 if (sourceaddr
->ai_addr
->sa_family
!= AF_INET
) {
283 freeaddrinfo(sourceaddr
);
284 av_log(NULL
, AV_LOG_ERROR
, "%s is of incorrect protocol family\n",
286 return AVERROR(EINVAL
);
289 mreqs
.imr_multiaddr
.s_addr
= ((struct sockaddr_in
*)addr
)->sin_addr
.s_addr
;
290 mreqs
.imr_interface
.s_addr
= INADDR_ANY
;
291 mreqs
.imr_sourceaddr
.s_addr
= ((struct sockaddr_in
*)sourceaddr
->ai_addr
)->sin_addr
.s_addr
;
292 freeaddrinfo(sourceaddr
);
294 if (setsockopt(sockfd
, IPPROTO_IP
,
295 include
? IP_ADD_SOURCE_MEMBERSHIP
: IP_BLOCK_SOURCE
,
296 (const void *)&mreqs
, sizeof(mreqs
)) < 0) {
298 log_net_error(NULL
, AV_LOG_ERROR
, "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)");
300 log_net_error(NULL
, AV_LOG_ERROR
, "setsockopt(IP_BLOCK_SOURCE)");
301 return ff_neterrno();
305 return AVERROR(ENOSYS
);
309 static int udp_set_url(struct sockaddr_storage
*addr
,
310 const char *hostname
, int port
)
312 struct addrinfo
*res0
;
315 res0
= udp_resolve_host(hostname
, port
, SOCK_DGRAM
, AF_UNSPEC
, 0);
316 if (!res0
) return AVERROR(EIO
);
317 memcpy(addr
, res0
->ai_addr
, res0
->ai_addrlen
);
318 addr_len
= res0
->ai_addrlen
;
324 static int udp_socket_create(UDPContext
*s
, struct sockaddr_storage
*addr
,
325 socklen_t
*addr_len
, const char *localaddr
)
328 struct addrinfo
*res0
, *res
;
329 int family
= AF_UNSPEC
;
331 if (((struct sockaddr
*) &s
->dest_addr
)->sa_family
)
332 family
= ((struct sockaddr
*) &s
->dest_addr
)->sa_family
;
333 res0
= udp_resolve_host(localaddr
[0] ? localaddr
: NULL
, s
->local_port
,
334 SOCK_DGRAM
, family
, AI_PASSIVE
);
337 for (res
= res0
; res
; res
=res
->ai_next
) {
338 udp_fd
= ff_socket(res
->ai_family
, SOCK_DGRAM
, 0);
339 if (udp_fd
!= -1) break;
340 log_net_error(NULL
, AV_LOG_ERROR
, "socket");
346 memcpy(addr
, res
->ai_addr
, res
->ai_addrlen
);
347 *addr_len
= res
->ai_addrlen
;
361 static int udp_port(struct sockaddr_storage
*addr
, int addr_len
)
363 char sbuf
[sizeof(int)*3+1];
366 if ((error
= getnameinfo((struct sockaddr
*)addr
, addr_len
, NULL
, 0, sbuf
, sizeof(sbuf
), NI_NUMERICSERV
)) != 0) {
367 av_log(NULL
, AV_LOG_ERROR
, "getnameinfo: %s\n", gai_strerror(error
));
371 return strtol(sbuf
, NULL
, 10);
376 * If no filename is given to av_open_input_file because you want to
377 * get the local port first, then you must call this function to set
378 * the remote server address.
380 * url syntax: udp://host:port[?option=val...]
381 * option: 'ttl=n' : set the ttl value (for multicast only)
382 * 'localport=n' : set the local port
383 * 'pkt_size=n' : set max packet size
384 * 'reuse=1' : enable reusing the socket
385 * 'overrun_nonfatal=1': survive in case of circular buffer overrun
387 * @param h media file context
388 * @param uri of the remote server
389 * @return zero if no error.
391 int ff_udp_set_remote_url(URLContext
*h
, const char *uri
)
393 UDPContext
*s
= h
->priv_data
;
394 char hostname
[256], buf
[10];
398 av_url_split(NULL
, 0, NULL
, 0, hostname
, sizeof(hostname
), &port
, NULL
, 0, uri
);
400 /* set the destination address */
401 s
->dest_addr_len
= udp_set_url(&s
->dest_addr
, hostname
, port
);
402 if (s
->dest_addr_len
< 0) {
405 s
->is_multicast
= ff_is_multicast_address((struct sockaddr
*) &s
->dest_addr
);
406 p
= strchr(uri
, '?');
408 if (av_find_info_tag(buf
, sizeof(buf
), "connect", p
)) {
409 int was_connected
= s
->is_connected
;
410 s
->is_connected
= strtol(buf
, NULL
, 10);
411 if (s
->is_connected
&& !was_connected
) {
412 if (connect(s
->udp_fd
, (struct sockaddr
*) &s
->dest_addr
,
415 log_net_error(h
, AV_LOG_ERROR
, "connect");
426 * Return the local port used by the UDP connection
427 * @param h media file context
428 * @return the local port number
430 int ff_udp_get_local_port(URLContext
*h
)
432 UDPContext
*s
= h
->priv_data
;
433 return s
->local_port
;
437 * Return the udp file handle for select() usage to wait for several RTP
438 * streams at the same time.
439 * @param h media file context
441 static int udp_get_file_handle(URLContext
*h
)
443 UDPContext
*s
= h
->priv_data
;
447 #if HAVE_PTHREAD_CANCEL
448 static void *circular_buffer_task( void *_URLContext
)
450 URLContext
*h
= _URLContext
;
451 UDPContext
*s
= h
->priv_data
;
454 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE
, &old_cancelstate
);
455 pthread_mutex_lock(&s
->mutex
);
456 if (ff_socket_nonblock(s
->udp_fd
, 0) < 0) {
457 av_log(h
, AV_LOG_ERROR
, "Failed to set blocking mode");
458 s
->circular_buffer_error
= AVERROR(EIO
);
464 pthread_mutex_unlock(&s
->mutex
);
465 /* Blocking operations are always cancellation points;
466 see "General Information" / "Thread Cancelation Overview"
468 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE
, &old_cancelstate
);
469 len
= recv(s
->udp_fd
, s
->tmp
+4, sizeof(s
->tmp
)-4, 0);
470 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE
, &old_cancelstate
);
471 pthread_mutex_lock(&s
->mutex
);
473 if (ff_neterrno() != AVERROR(EAGAIN
) && ff_neterrno() != AVERROR(EINTR
)) {
474 s
->circular_buffer_error
= ff_neterrno();
479 AV_WL32(s
->tmp
, len
);
481 if(av_fifo_space(s
->fifo
) < len
+ 4) {
483 if (s
->overrun_nonfatal
) {
484 av_log(h
, AV_LOG_WARNING
, "Circular buffer overrun. "
485 "Surviving due to overrun_nonfatal option\n");
488 av_log(h
, AV_LOG_ERROR
, "Circular buffer overrun. "
489 "To avoid, increase fifo_size URL option. "
490 "To survive in such case, use overrun_nonfatal option\n");
491 s
->circular_buffer_error
= AVERROR(EIO
);
495 av_fifo_generic_write(s
->fifo
, s
->tmp
, len
+4, NULL
);
496 pthread_cond_signal(&s
->cond
);
500 pthread_cond_signal(&s
->cond
);
501 pthread_mutex_unlock(&s
->mutex
);
506 static int parse_source_list(char *buf
, char **sources
, int *num_sources
,
513 char *next
= strchr(source_start
, ',');
516 sources
[*num_sources
] = av_strdup(source_start
);
517 if (!sources
[*num_sources
])
518 return AVERROR(ENOMEM
);
519 source_start
= next
+ 1;
521 if (*num_sources
>= max_sources
|| !next
)
527 /* put it in UDP context */
528 /* return non zero if error */
529 static int udp_open(URLContext
*h
, const char *uri
, int flags
)
531 char hostname
[1024], localaddr
[1024] = "";
532 int port
, udp_fd
= -1, tmp
, bind_ret
= -1;
533 UDPContext
*s
= h
->priv_data
;
537 struct sockaddr_storage my_addr
;
539 int reuse_specified
= 0;
540 int i
, num_include_sources
= 0, num_exclude_sources
= 0;
541 char *include_sources
[32], *exclude_sources
[32];
545 is_output
= !(flags
& AVIO_FLAG_READ
);
546 if (!s
->buffer_size
) /* if not set explicitly */
547 s
->buffer_size
= is_output
? UDP_TX_BUF_SIZE
: UDP_MAX_PKT_SIZE
;
549 p
= strchr(uri
, '?');
551 if (av_find_info_tag(buf
, sizeof(buf
), "reuse", p
)) {
553 s
->reuse_socket
= strtol(buf
, &endptr
, 10);
554 /* assume if no digits were found it is a request to enable it */
559 if (av_find_info_tag(buf
, sizeof(buf
), "overrun_nonfatal", p
)) {
561 s
->overrun_nonfatal
= strtol(buf
, &endptr
, 10);
562 /* assume if no digits were found it is a request to enable it */
564 s
->overrun_nonfatal
= 1;
565 if (!HAVE_PTHREAD_CANCEL
)
566 av_log(h
, AV_LOG_WARNING
,
567 "'overrun_nonfatal' option was set but it is not supported "
568 "on this build (pthread support is required)\n");
570 if (av_find_info_tag(buf
, sizeof(buf
), "ttl", p
)) {
571 s
->ttl
= strtol(buf
, NULL
, 10);
573 if (av_find_info_tag(buf
, sizeof(buf
), "localport", p
)) {
574 s
->local_port
= strtol(buf
, NULL
, 10);
576 if (av_find_info_tag(buf
, sizeof(buf
), "pkt_size", p
)) {
577 s
->packet_size
= strtol(buf
, NULL
, 10);
579 if (av_find_info_tag(buf
, sizeof(buf
), "buffer_size", p
)) {
580 s
->buffer_size
= strtol(buf
, NULL
, 10);
582 if (av_find_info_tag(buf
, sizeof(buf
), "connect", p
)) {
583 s
->is_connected
= strtol(buf
, NULL
, 10);
585 if (av_find_info_tag(buf
, sizeof(buf
), "fifo_size", p
)) {
586 s
->circular_buffer_size
= strtol(buf
, NULL
, 10);
587 if (!HAVE_PTHREAD_CANCEL
)
588 av_log(h
, AV_LOG_WARNING
,
589 "'circular_buffer_size' option was set but it is not supported "
590 "on this build (pthread support is required)\n");
592 if (av_find_info_tag(buf
, sizeof(buf
), "localaddr", p
)) {
593 av_strlcpy(localaddr
, buf
, sizeof(localaddr
));
595 if (av_find_info_tag(buf
, sizeof(buf
), "sources", p
)) {
596 if (parse_source_list(buf
, include_sources
, &num_include_sources
,
597 FF_ARRAY_ELEMS(include_sources
)))
600 if (av_find_info_tag(buf
, sizeof(buf
), "block", p
)) {
601 if (parse_source_list(buf
, exclude_sources
, &num_exclude_sources
,
602 FF_ARRAY_ELEMS(exclude_sources
)))
605 if (!is_output
&& av_find_info_tag(buf
, sizeof(buf
), "timeout", p
))
606 s
->timeout
= strtol(buf
, NULL
, 10);
607 if (is_output
&& av_find_info_tag(buf
, sizeof(buf
), "broadcast", p
))
608 s
->is_broadcast
= strtol(buf
, NULL
, 10);
610 /* handling needed to support options picking from both AVOption and URL */
611 s
->circular_buffer_size
*= 188;
612 if (flags
& AVIO_FLAG_WRITE
) {
613 h
->max_packet_size
= s
->packet_size
;
615 h
->max_packet_size
= UDP_MAX_PKT_SIZE
;
617 h
->rw_timeout
= s
->timeout
;
619 /* fill the dest addr */
620 av_url_split(NULL
, 0, NULL
, 0, hostname
, sizeof(hostname
), &port
, NULL
, 0, uri
);
622 /* XXX: fix av_url_split */
623 if (hostname
[0] == '\0' || hostname
[0] == '?') {
624 /* only accepts null hostname if input */
625 if (!(flags
& AVIO_FLAG_READ
))
628 if (ff_udp_set_remote_url(h
, uri
) < 0)
632 if ((s
->is_multicast
|| !s
->local_port
) && (h
->flags
& AVIO_FLAG_READ
))
633 s
->local_port
= port
;
634 udp_fd
= udp_socket_create(s
, &my_addr
, &len
, localaddr
[0] ? localaddr
: s
->local_addr
);
638 s
->local_addr_storage
=my_addr
; //store for future multicast join
640 /* Follow the requested reuse option, unless it's multicast in which
641 * case enable reuse unless explicitly disabled.
643 if (s
->reuse_socket
|| (s
->is_multicast
&& !reuse_specified
)) {
645 if (setsockopt (udp_fd
, SOL_SOCKET
, SO_REUSEADDR
, &(s
->reuse_socket
), sizeof(s
->reuse_socket
)) != 0)
649 if (s
->is_broadcast
) {
651 if (setsockopt (udp_fd
, SOL_SOCKET
, SO_BROADCAST
, &(s
->is_broadcast
), sizeof(s
->is_broadcast
)) != 0)
656 /* If multicast, try binding the multicast address first, to avoid
657 * receiving UDP packets from other sources aimed at the same UDP
658 * port. This fails on windows. This makes sending to the same address
659 * using sendto() fail, so only do it if we're opened in read-only mode. */
660 if (s
->is_multicast
&& !(h
->flags
& AVIO_FLAG_WRITE
)) {
661 bind_ret
= bind(udp_fd
,(struct sockaddr
*)&s
->dest_addr
, len
);
663 /* bind to the local address if not multicast or if the multicast
665 /* the bind is needed to give a port to the socket now */
666 if (bind_ret
< 0 && bind(udp_fd
,(struct sockaddr
*)&my_addr
, len
) < 0) {
667 log_net_error(h
, AV_LOG_ERROR
, "bind failed");
671 len
= sizeof(my_addr
);
672 getsockname(udp_fd
, (struct sockaddr
*)&my_addr
, &len
);
673 s
->local_port
= udp_port(&my_addr
, len
);
675 if (s
->is_multicast
) {
676 if (h
->flags
& AVIO_FLAG_WRITE
) {
678 if (udp_set_multicast_ttl(udp_fd
, s
->ttl
, (struct sockaddr
*)&s
->dest_addr
) < 0)
681 if (h
->flags
& AVIO_FLAG_READ
) {
683 if (num_include_sources
&& num_exclude_sources
) {
684 av_log(h
, AV_LOG_ERROR
, "Simultaneously including and excluding multicast sources is not supported\n");
687 if (num_include_sources
) {
688 if (udp_set_multicast_sources(udp_fd
, (struct sockaddr
*)&s
->dest_addr
, s
->dest_addr_len
, include_sources
, num_include_sources
, 1) < 0)
691 if (udp_join_multicast_group(udp_fd
, (struct sockaddr
*)&s
->dest_addr
,(struct sockaddr
*)&s
->local_addr_storage
) < 0)
694 if (num_exclude_sources
) {
695 if (udp_set_multicast_sources(udp_fd
, (struct sockaddr
*)&s
->dest_addr
, s
->dest_addr_len
, exclude_sources
, num_exclude_sources
, 0) < 0)
702 /* limit the tx buf size to limit latency */
703 tmp
= s
->buffer_size
;
704 if (setsockopt(udp_fd
, SOL_SOCKET
, SO_SNDBUF
, &tmp
, sizeof(tmp
)) < 0) {
705 log_net_error(h
, AV_LOG_ERROR
, "setsockopt(SO_SNDBUF)");
709 /* set udp recv buffer size to the requested value (default 64K) */
710 tmp
= s
->buffer_size
;
711 if (setsockopt(udp_fd
, SOL_SOCKET
, SO_RCVBUF
, &tmp
, sizeof(tmp
)) < 0) {
712 log_net_error(h
, AV_LOG_WARNING
, "setsockopt(SO_RECVBUF)");
715 if (getsockopt(udp_fd
, SOL_SOCKET
, SO_RCVBUF
, &tmp
, &len
) < 0) {
716 log_net_error(h
, AV_LOG_WARNING
, "getsockopt(SO_RCVBUF)");
718 av_log(h
, AV_LOG_DEBUG
, "end receive buffer size reported is %d\n", tmp
);
719 if(tmp
< s
->buffer_size
)
720 av_log(h
, AV_LOG_WARNING
, "attempted to set receive buffer to size %d but it only ended up set as %d", s
->buffer_size
, tmp
);
723 /* make the socket non-blocking */
724 ff_socket_nonblock(udp_fd
, 1);
726 if (s
->is_connected
) {
727 if (connect(udp_fd
, (struct sockaddr
*) &s
->dest_addr
, s
->dest_addr_len
)) {
728 log_net_error(h
, AV_LOG_ERROR
, "connect");
733 for (i
= 0; i
< num_include_sources
; i
++)
734 av_freep(&include_sources
[i
]);
735 for (i
= 0; i
< num_exclude_sources
; i
++)
736 av_freep(&exclude_sources
[i
]);
740 #if HAVE_PTHREAD_CANCEL
741 if (!is_output
&& s
->circular_buffer_size
) {
744 /* start the task going */
745 s
->fifo
= av_fifo_alloc(s
->circular_buffer_size
);
746 ret
= pthread_mutex_init(&s
->mutex
, NULL
);
748 av_log(h
, AV_LOG_ERROR
, "pthread_mutex_init failed : %s\n", strerror(ret
));
751 ret
= pthread_cond_init(&s
->cond
, NULL
);
753 av_log(h
, AV_LOG_ERROR
, "pthread_cond_init failed : %s\n", strerror(ret
));
756 ret
= pthread_create(&s
->circular_buffer_thread
, NULL
, circular_buffer_task
, h
);
758 av_log(h
, AV_LOG_ERROR
, "pthread_create failed : %s\n", strerror(ret
));
761 s
->thread_started
= 1;
766 #if HAVE_PTHREAD_CANCEL
768 pthread_cond_destroy(&s
->cond
);
770 pthread_mutex_destroy(&s
->mutex
);
775 av_fifo_freep(&s
->fifo
);
776 for (i
= 0; i
< num_include_sources
; i
++)
777 av_freep(&include_sources
[i
]);
778 for (i
= 0; i
< num_exclude_sources
; i
++)
779 av_freep(&exclude_sources
[i
]);
783 static int udp_read(URLContext
*h
, uint8_t *buf
, int size
)
785 UDPContext
*s
= h
->priv_data
;
787 #if HAVE_PTHREAD_CANCEL
788 int avail
, nonblock
= h
->flags
& AVIO_FLAG_NONBLOCK
;
791 pthread_mutex_lock(&s
->mutex
);
793 avail
= av_fifo_size(s
->fifo
);
794 if (avail
) { // >=size) {
797 av_fifo_generic_read(s
->fifo
, tmp
, 4, NULL
);
800 av_log(h
, AV_LOG_WARNING
, "Part of datagram lost due to insufficient buffer size\n");
804 av_fifo_generic_read(s
->fifo
, buf
, avail
, NULL
);
805 av_fifo_drain(s
->fifo
, AV_RL32(tmp
) - avail
);
806 pthread_mutex_unlock(&s
->mutex
);
808 } else if(s
->circular_buffer_error
){
809 int err
= s
->circular_buffer_error
;
810 pthread_mutex_unlock(&s
->mutex
);
812 } else if(nonblock
) {
813 pthread_mutex_unlock(&s
->mutex
);
814 return AVERROR(EAGAIN
);
817 /* FIXME: using the monotonic clock would be better,
818 but it does not exist on all supported platforms. */
819 int64_t t
= av_gettime() + 100000;
820 struct timespec tv
= { .tv_sec
= t
/ 1000000,
821 .tv_nsec
= (t
% 1000000) * 1000 };
822 if (pthread_cond_timedwait(&s
->cond
, &s
->mutex
, &tv
) < 0) {
823 pthread_mutex_unlock(&s
->mutex
);
824 return AVERROR(errno
== ETIMEDOUT
? EAGAIN
: errno
);
832 if (!(h
->flags
& AVIO_FLAG_NONBLOCK
)) {
833 ret
= ff_network_wait_fd(s
->udp_fd
, 0);
837 ret
= recv(s
->udp_fd
, buf
, size
, 0);
839 return ret
< 0 ? ff_neterrno() : ret
;
842 static int udp_write(URLContext
*h
, const uint8_t *buf
, int size
)
844 UDPContext
*s
= h
->priv_data
;
847 if (!(h
->flags
& AVIO_FLAG_NONBLOCK
)) {
848 ret
= ff_network_wait_fd(s
->udp_fd
, 1);
853 if (!s
->is_connected
) {
854 ret
= sendto (s
->udp_fd
, buf
, size
, 0,
855 (struct sockaddr
*) &s
->dest_addr
,
858 ret
= send(s
->udp_fd
, buf
, size
, 0);
860 return ret
< 0 ? ff_neterrno() : ret
;
863 static int udp_close(URLContext
*h
)
865 UDPContext
*s
= h
->priv_data
;
867 if (s
->is_multicast
&& (h
->flags
& AVIO_FLAG_READ
))
868 udp_leave_multicast_group(s
->udp_fd
, (struct sockaddr
*)&s
->dest_addr
,(struct sockaddr
*)&s
->local_addr_storage
);
869 closesocket(s
->udp_fd
);
870 #if HAVE_PTHREAD_CANCEL
871 if (s
->thread_started
) {
873 pthread_cancel(s
->circular_buffer_thread
);
874 ret
= pthread_join(s
->circular_buffer_thread
, NULL
);
876 av_log(h
, AV_LOG_ERROR
, "pthread_join(): %s\n", strerror(ret
));
877 pthread_mutex_destroy(&s
->mutex
);
878 pthread_cond_destroy(&s
->cond
);
881 av_fifo_freep(&s
->fifo
);
885 URLProtocol ff_udp_protocol
= {
887 .url_open
= udp_open
,
888 .url_read
= udp_read
,
889 .url_write
= udp_write
,
890 .url_close
= udp_close
,
891 .url_get_file_handle
= udp_get_file_handle
,
892 .priv_data_size
= sizeof(UDPContext
),
893 .priv_data_class
= &udp_context_class
,
894 .flags
= URL_PROTOCOL_FLAG_NETWORK
,