blob: 3835f989c42ba1c55658bc47bd106b9cdea17b32
1 | /* |
2 | * UDP prototype streaming system |
3 | * Copyright (c) 2000, 2001, 2002 Fabrice Bellard |
4 | * |
5 | * This file is part of FFmpeg. |
6 | * |
7 | * FFmpeg is free software; you can redistribute it and/or |
8 | * modify it under the terms of the GNU Lesser General Public |
9 | * License as published by the Free Software Foundation; either |
10 | * version 2.1 of the License, or (at your option) any later version. |
11 | * |
12 | * FFmpeg is distributed in the hope that it will be useful, |
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
15 | * Lesser General Public License for more details. |
16 | * |
17 | * You should have received a copy of the GNU Lesser General Public |
18 | * License along with FFmpeg; if not, write to the Free Software |
19 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
20 | */ |
21 | |
22 | /** |
23 | * @file |
24 | * UDP protocol |
25 | */ |
26 | |
27 | #define _DEFAULT_SOURCE |
28 | #define _BSD_SOURCE /* Needed for using struct ip_mreq with recent glibc */ |
29 | |
30 | #include "avformat.h" |
31 | #include "avio_internal.h" |
32 | #include "libavutil/avassert.h" |
33 | #include "libavutil/parseutils.h" |
34 | #include "libavutil/fifo.h" |
35 | #include "libavutil/intreadwrite.h" |
36 | #include "libavutil/avstring.h" |
37 | #include "libavutil/opt.h" |
38 | #include "libavutil/log.h" |
39 | #include "libavutil/time.h" |
40 | #include "internal.h" |
41 | #include "network.h" |
42 | #include "os_support.h" |
43 | #include "url.h" |
44 | |
45 | #ifdef __APPLE__ |
46 | #include "TargetConditionals.h" |
47 | #endif |
48 | |
49 | #if HAVE_UDPLITE_H |
50 | #include "udplite.h" |
51 | #else |
52 | /* On many Linux systems, udplite.h is missing but the kernel supports UDP-Lite. |
53 | * So, we provide a fallback here. |
54 | */ |
55 | #define UDPLITE_SEND_CSCOV 10 |
56 | #define UDPLITE_RECV_CSCOV 11 |
57 | #endif |
58 | |
59 | #ifndef IPPROTO_UDPLITE |
60 | #define IPPROTO_UDPLITE 136 |
61 | #endif |
62 | |
63 | #if HAVE_PTHREAD_CANCEL |
64 | #include <pthread.h> |
65 | #endif |
66 | |
67 | #ifndef HAVE_PTHREAD_CANCEL |
68 | #define HAVE_PTHREAD_CANCEL 0 |
69 | #endif |
70 | |
71 | #ifndef IPV6_ADD_MEMBERSHIP |
72 | #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP |
73 | #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP |
74 | #endif |
75 | |
76 | #define UDP_TX_BUF_SIZE 32768 |
77 | #define UDP_MAX_PKT_SIZE 65536 |
78 | #define UDP_HEADER_SIZE 8 |
79 | |
80 | typedef struct UDPContext { |
81 | const AVClass *class; |
82 | int udp_fd; |
83 | int ttl; |
84 | int udplite_coverage; |
85 | int buffer_size; |
86 | int pkt_size; |
87 | int is_multicast; |
88 | int is_broadcast; |
89 | int local_port; |
90 | int reuse_socket; |
91 | int overrun_nonfatal; |
92 | struct sockaddr_storage dest_addr; |
93 | int dest_addr_len; |
94 | int is_connected; |
95 | |
96 | /* Circular Buffer variables for use in UDP receive code */ |
97 | int circular_buffer_size; |
98 | AVFifoBuffer *fifo; |
99 | int circular_buffer_error; |
100 | int64_t bitrate; /* number of bits to send per second */ |
101 | int64_t burst_bits; |
102 | int close_req; |
103 | #if HAVE_PTHREAD_CANCEL |
104 | pthread_t circular_buffer_thread; |
105 | pthread_mutex_t mutex; |
106 | pthread_cond_t cond; |
107 | int thread_started; |
108 | #endif |
109 | uint8_t tmp[UDP_MAX_PKT_SIZE+4]; |
110 | int remaining_in_dg; |
111 | char *localaddr; |
112 | int timeout; |
113 | struct sockaddr_storage local_addr_storage; |
114 | char *sources; |
115 | char *block; |
116 | } UDPContext; |
117 | |
118 | #define OFFSET(x) offsetof(UDPContext, x) |
119 | #define D AV_OPT_FLAG_DECODING_PARAM |
120 | #define E AV_OPT_FLAG_ENCODING_PARAM |
121 | static const AVOption options[] = { |
122 | { "buffer_size", "System data size (in bytes)", OFFSET(buffer_size), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E }, |
123 | { "bitrate", "Bits to send per second", OFFSET(bitrate), AV_OPT_TYPE_INT64, { .i64 = 0 }, 0, INT64_MAX, .flags = E }, |
124 | { "burst_bits", "Max length of bursts in bits (when using bitrate)", OFFSET(burst_bits), AV_OPT_TYPE_INT64, { .i64 = 0 }, 0, INT64_MAX, .flags = E }, |
125 | { "localport", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, D|E }, |
126 | { "local_port", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E }, |
127 | { "localaddr", "Local address", OFFSET(localaddr), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E }, |
128 | { "udplite_coverage", "choose UDPLite head size which should be validated by checksum", OFFSET(udplite_coverage), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E }, |
129 | { "pkt_size", "Maximum UDP packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 1472 }, -1, INT_MAX, .flags = D|E }, |
130 | { "reuse", "explicitly allow reusing UDP sockets", OFFSET(reuse_socket), AV_OPT_TYPE_BOOL, { .i64 = -1 }, -1, 1, D|E }, |
131 | { "reuse_socket", "explicitly allow reusing UDP sockets", OFFSET(reuse_socket), AV_OPT_TYPE_BOOL, { .i64 = -1 }, -1, 1, .flags = D|E }, |
132 | { "broadcast", "explicitly allow or disallow broadcast destination", OFFSET(is_broadcast), AV_OPT_TYPE_BOOL, { .i64 = 0 }, 0, 1, E }, |
133 | { "ttl", "Time to live (multicast only)", OFFSET(ttl), AV_OPT_TYPE_INT, { .i64 = 16 }, 0, INT_MAX, E }, |
134 | { "connect", "set if connect() should be called on socket", OFFSET(is_connected), AV_OPT_TYPE_BOOL, { .i64 = 0 }, 0, 1, .flags = D|E }, |
135 | { "fifo_size", "set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D }, |
136 | { "overrun_nonfatal", "survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, D }, |
137 | { "timeout", "set raise error timeout (only in read mode)", OFFSET(timeout), AV_OPT_TYPE_INT, { .i64 = 0 }, 0, INT_MAX, D }, |
138 | { "sources", "Source list", OFFSET(sources), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E }, |
139 | { "block", "Block list", OFFSET(block), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E }, |
140 | { NULL } |
141 | }; |
142 | |
143 | static const AVClass udp_class = { |
144 | .class_name = "udp", |
145 | .item_name = av_default_item_name, |
146 | .option = options, |
147 | .version = LIBAVUTIL_VERSION_INT, |
148 | }; |
149 | |
150 | static const AVClass udplite_context_class = { |
151 | .class_name = "udplite", |
152 | .item_name = av_default_item_name, |
153 | .option = options, |
154 | .version = LIBAVUTIL_VERSION_INT, |
155 | }; |
156 | |
157 | static void log_net_error(void *ctx, int level, const char* prefix) |
158 | { |
159 | char errbuf[100]; |
160 | av_strerror(ff_neterrno(), errbuf, sizeof(errbuf)); |
161 | av_log(ctx, level, "%s: %s\n", prefix, errbuf); |
162 | } |
163 | |
164 | static int udp_set_multicast_ttl(int sockfd, int mcastTTL, |
165 | struct sockaddr *addr) |
166 | { |
167 | #ifdef IP_MULTICAST_TTL |
168 | if (addr->sa_family == AF_INET) { |
169 | if (setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL, &mcastTTL, sizeof(mcastTTL)) < 0) { |
170 | log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_MULTICAST_TTL)"); |
171 | return -1; |
172 | } |
173 | } |
174 | #endif |
175 | #if defined(IPPROTO_IPV6) && defined(IPV6_MULTICAST_HOPS) |
176 | if (addr->sa_family == AF_INET6) { |
177 | if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &mcastTTL, sizeof(mcastTTL)) < 0) { |
178 | log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_MULTICAST_HOPS)"); |
179 | return -1; |
180 | } |
181 | } |
182 | #endif |
183 | return 0; |
184 | } |
185 | |
186 | static int udp_join_multicast_group(int sockfd, struct sockaddr *addr,struct sockaddr *local_addr) |
187 | { |
188 | #ifdef IP_ADD_MEMBERSHIP |
189 | if (addr->sa_family == AF_INET) { |
190 | struct ip_mreq mreq; |
191 | |
192 | mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr; |
193 | if (local_addr) |
194 | mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr; |
195 | else |
196 | mreq.imr_interface.s_addr= INADDR_ANY; |
197 | if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) { |
198 | log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_MEMBERSHIP)"); |
199 | return -1; |
200 | } |
201 | } |
202 | #endif |
203 | #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6) |
204 | if (addr->sa_family == AF_INET6) { |
205 | struct ipv6_mreq mreq6; |
206 | |
207 | memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr)); |
208 | mreq6.ipv6mr_interface= 0; |
209 | if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) { |
210 | log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_ADD_MEMBERSHIP)"); |
211 | return -1; |
212 | } |
213 | } |
214 | #endif |
215 | return 0; |
216 | } |
217 | |
218 | static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr,struct sockaddr *local_addr) |
219 | { |
220 | #ifdef IP_DROP_MEMBERSHIP |
221 | if (addr->sa_family == AF_INET) { |
222 | struct ip_mreq mreq; |
223 | |
224 | mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr; |
225 | if (local_addr) |
226 | mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr; |
227 | else |
228 | mreq.imr_interface.s_addr= INADDR_ANY; |
229 | if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) { |
230 | log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_DROP_MEMBERSHIP)"); |
231 | return -1; |
232 | } |
233 | } |
234 | #endif |
235 | #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6) |
236 | if (addr->sa_family == AF_INET6) { |
237 | struct ipv6_mreq mreq6; |
238 | |
239 | memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr)); |
240 | mreq6.ipv6mr_interface= 0; |
241 | if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) { |
242 | log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_DROP_MEMBERSHIP)"); |
243 | return -1; |
244 | } |
245 | } |
246 | #endif |
247 | return 0; |
248 | } |
249 | |
250 | static struct addrinfo *udp_resolve_host(URLContext *h, |
251 | const char *hostname, int port, |
252 | int type, int family, int flags) |
253 | { |
254 | struct addrinfo hints = { 0 }, *res = 0; |
255 | int error; |
256 | char sport[16]; |
257 | const char *node = 0, *service = "0"; |
258 | |
259 | if (port > 0) { |
260 | snprintf(sport, sizeof(sport), "%d", port); |
261 | service = sport; |
262 | } |
263 | if ((hostname) && (hostname[0] != '\0') && (hostname[0] != '?')) { |
264 | node = hostname; |
265 | } |
266 | hints.ai_socktype = type; |
267 | hints.ai_family = family; |
268 | hints.ai_flags = flags; |
269 | if ((error = getaddrinfo(node, service, &hints, &res))) { |
270 | res = NULL; |
271 | av_log(h, AV_LOG_ERROR, "getaddrinfo(%s, %s): %s\n", |
272 | node ? node : "unknown", |
273 | service, |
274 | gai_strerror(error)); |
275 | } |
276 | |
277 | return res; |
278 | } |
279 | |
280 | static int udp_set_multicast_sources(URLContext *h, |
281 | int sockfd, struct sockaddr *addr, |
282 | int addr_len, char **sources, |
283 | int nb_sources, int include) |
284 | { |
285 | #if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE) && !defined(_WIN32) && (!defined(TARGET_OS_TV) || !TARGET_OS_TV) |
286 | /* These ones are available in the microsoft SDK, but don't seem to work |
287 | * as on linux, so just prefer the v4-only approach there for now. */ |
288 | int i; |
289 | for (i = 0; i < nb_sources; i++) { |
290 | struct group_source_req mreqs; |
291 | int level = addr->sa_family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6; |
292 | struct addrinfo *sourceaddr = udp_resolve_host(h, sources[i], 0, |
293 | SOCK_DGRAM, AF_UNSPEC, |
294 | 0); |
295 | if (!sourceaddr) |
296 | return AVERROR(ENOENT); |
297 | |
298 | mreqs.gsr_interface = 0; |
299 | memcpy(&mreqs.gsr_group, addr, addr_len); |
300 | memcpy(&mreqs.gsr_source, sourceaddr->ai_addr, sourceaddr->ai_addrlen); |
301 | freeaddrinfo(sourceaddr); |
302 | |
303 | if (setsockopt(sockfd, level, |
304 | include ? MCAST_JOIN_SOURCE_GROUP : MCAST_BLOCK_SOURCE, |
305 | (const void *)&mreqs, sizeof(mreqs)) < 0) { |
306 | if (include) |
307 | log_net_error(NULL, AV_LOG_ERROR, "setsockopt(MCAST_JOIN_SOURCE_GROUP)"); |
308 | else |
309 | log_net_error(NULL, AV_LOG_ERROR, "setsockopt(MCAST_BLOCK_SOURCE)"); |
310 | return ff_neterrno(); |
311 | } |
312 | } |
313 | #elif HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE) |
314 | int i; |
315 | if (addr->sa_family != AF_INET) { |
316 | av_log(NULL, AV_LOG_ERROR, |
317 | "Setting multicast sources only supported for IPv4\n"); |
318 | return AVERROR(EINVAL); |
319 | } |
320 | for (i = 0; i < nb_sources; i++) { |
321 | struct ip_mreq_source mreqs; |
322 | struct addrinfo *sourceaddr = udp_resolve_host(h, sources[i], 0, |
323 | SOCK_DGRAM, AF_UNSPEC, |
324 | 0); |
325 | if (!sourceaddr) |
326 | return AVERROR(ENOENT); |
327 | if (sourceaddr->ai_addr->sa_family != AF_INET) { |
328 | freeaddrinfo(sourceaddr); |
329 | av_log(NULL, AV_LOG_ERROR, "%s is of incorrect protocol family\n", |
330 | sources[i]); |
331 | return AVERROR(EINVAL); |
332 | } |
333 | |
334 | mreqs.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr; |
335 | mreqs.imr_interface.s_addr = INADDR_ANY; |
336 | mreqs.imr_sourceaddr.s_addr = ((struct sockaddr_in *)sourceaddr->ai_addr)->sin_addr.s_addr; |
337 | freeaddrinfo(sourceaddr); |
338 | |
339 | if (setsockopt(sockfd, IPPROTO_IP, |
340 | include ? IP_ADD_SOURCE_MEMBERSHIP : IP_BLOCK_SOURCE, |
341 | (const void *)&mreqs, sizeof(mreqs)) < 0) { |
342 | if (include) |
343 | log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)"); |
344 | else |
345 | log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_BLOCK_SOURCE)"); |
346 | return ff_neterrno(); |
347 | } |
348 | } |
349 | #else |
350 | return AVERROR(ENOSYS); |
351 | #endif |
352 | return 0; |
353 | } |
354 | static int udp_set_url(URLContext *h, |
355 | struct sockaddr_storage *addr, |
356 | const char *hostname, int port) |
357 | { |
358 | struct addrinfo *res0; |
359 | int addr_len; |
360 | |
361 | res0 = udp_resolve_host(h, hostname, port, SOCK_DGRAM, AF_UNSPEC, 0); |
362 | if (!res0) return AVERROR(EIO); |
363 | memcpy(addr, res0->ai_addr, res0->ai_addrlen); |
364 | addr_len = res0->ai_addrlen; |
365 | freeaddrinfo(res0); |
366 | |
367 | return addr_len; |
368 | } |
369 | |
370 | static int udp_socket_create(URLContext *h, struct sockaddr_storage *addr, |
371 | socklen_t *addr_len, const char *localaddr) |
372 | { |
373 | UDPContext *s = h->priv_data; |
374 | int udp_fd = -1; |
375 | struct addrinfo *res0, *res; |
376 | int family = AF_UNSPEC; |
377 | |
378 | if (((struct sockaddr *) &s->dest_addr)->sa_family) |
379 | family = ((struct sockaddr *) &s->dest_addr)->sa_family; |
380 | res0 = udp_resolve_host(h, (localaddr && localaddr[0]) ? localaddr : NULL, |
381 | s->local_port, |
382 | SOCK_DGRAM, family, AI_PASSIVE); |
383 | if (!res0) |
384 | goto fail; |
385 | for (res = res0; res; res=res->ai_next) { |
386 | if (s->udplite_coverage) |
387 | udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, IPPROTO_UDPLITE); |
388 | else |
389 | udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, 0); |
390 | if (udp_fd != -1) break; |
391 | log_net_error(NULL, AV_LOG_ERROR, "socket"); |
392 | } |
393 | |
394 | if (udp_fd < 0) |
395 | goto fail; |
396 | |
397 | memcpy(addr, res->ai_addr, res->ai_addrlen); |
398 | *addr_len = res->ai_addrlen; |
399 | |
400 | freeaddrinfo(res0); |
401 | |
402 | return udp_fd; |
403 | |
404 | fail: |
405 | if (udp_fd >= 0) |
406 | closesocket(udp_fd); |
407 | if(res0) |
408 | freeaddrinfo(res0); |
409 | return -1; |
410 | } |
411 | |
412 | static int udp_port(struct sockaddr_storage *addr, int addr_len) |
413 | { |
414 | char sbuf[sizeof(int)*3+1]; |
415 | int error; |
416 | |
417 | if ((error = getnameinfo((struct sockaddr *)addr, addr_len, NULL, 0, sbuf, sizeof(sbuf), NI_NUMERICSERV)) != 0) { |
418 | av_log(NULL, AV_LOG_ERROR, "getnameinfo: %s\n", gai_strerror(error)); |
419 | return -1; |
420 | } |
421 | |
422 | return strtol(sbuf, NULL, 10); |
423 | } |
424 | |
425 | |
426 | /** |
427 | * If no filename is given to av_open_input_file because you want to |
428 | * get the local port first, then you must call this function to set |
429 | * the remote server address. |
430 | * |
431 | * url syntax: udp://host:port[?option=val...] |
432 | * option: 'ttl=n' : set the ttl value (for multicast only) |
433 | * 'localport=n' : set the local port |
434 | * 'pkt_size=n' : set max packet size |
435 | * 'reuse=1' : enable reusing the socket |
436 | * 'overrun_nonfatal=1': survive in case of circular buffer overrun |
437 | * |
438 | * @param h media file context |
439 | * @param uri of the remote server |
440 | * @return zero if no error. |
441 | */ |
442 | int ff_udp_set_remote_url(URLContext *h, const char *uri) |
443 | { |
444 | UDPContext *s = h->priv_data; |
445 | char hostname[256], buf[10]; |
446 | int port; |
447 | const char *p; |
448 | |
449 | av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri); |
450 | |
451 | /* set the destination address */ |
452 | s->dest_addr_len = udp_set_url(h, &s->dest_addr, hostname, port); |
453 | if (s->dest_addr_len < 0) { |
454 | return AVERROR(EIO); |
455 | } |
456 | s->is_multicast = ff_is_multicast_address((struct sockaddr*) &s->dest_addr); |
457 | p = strchr(uri, '?'); |
458 | if (p) { |
459 | if (av_find_info_tag(buf, sizeof(buf), "connect", p)) { |
460 | int was_connected = s->is_connected; |
461 | s->is_connected = strtol(buf, NULL, 10); |
462 | if (s->is_connected && !was_connected) { |
463 | if (connect(s->udp_fd, (struct sockaddr *) &s->dest_addr, |
464 | s->dest_addr_len)) { |
465 | s->is_connected = 0; |
466 | log_net_error(h, AV_LOG_ERROR, "connect"); |
467 | return AVERROR(EIO); |
468 | } |
469 | } |
470 | } |
471 | } |
472 | |
473 | return 0; |
474 | } |
475 | |
476 | /** |
477 | * Return the local port used by the UDP connection |
478 | * @param h media file context |
479 | * @return the local port number |
480 | */ |
481 | int ff_udp_get_local_port(URLContext *h) |
482 | { |
483 | UDPContext *s = h->priv_data; |
484 | return s->local_port; |
485 | } |
486 | |
487 | /** |
488 | * Return the udp file handle for select() usage to wait for several RTP |
489 | * streams at the same time. |
490 | * @param h media file context |
491 | */ |
492 | static int udp_get_file_handle(URLContext *h) |
493 | { |
494 | UDPContext *s = h->priv_data; |
495 | return s->udp_fd; |
496 | } |
497 | |
498 | #if HAVE_PTHREAD_CANCEL |
499 | static void *circular_buffer_task_rx( void *_URLContext) |
500 | { |
501 | URLContext *h = _URLContext; |
502 | UDPContext *s = h->priv_data; |
503 | int old_cancelstate; |
504 | |
505 | pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate); |
506 | pthread_mutex_lock(&s->mutex); |
507 | if (ff_socket_nonblock(s->udp_fd, 0) < 0) { |
508 | av_log(h, AV_LOG_ERROR, "Failed to set blocking mode"); |
509 | s->circular_buffer_error = AVERROR(EIO); |
510 | goto end; |
511 | } |
512 | while(1) { |
513 | int len; |
514 | |
515 | pthread_mutex_unlock(&s->mutex); |
516 | /* Blocking operations are always cancellation points; |
517 | see "General Information" / "Thread Cancelation Overview" |
518 | in Single Unix. */ |
519 | pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate); |
520 | len = recv(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0); |
521 | pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate); |
522 | pthread_mutex_lock(&s->mutex); |
523 | if (len < 0) { |
524 | if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) { |
525 | s->circular_buffer_error = ff_neterrno(); |
526 | goto end; |
527 | } |
528 | continue; |
529 | } |
530 | AV_WL32(s->tmp, len); |
531 | |
532 | if(av_fifo_space(s->fifo) < len + 4) { |
533 | /* No Space left */ |
534 | if (s->overrun_nonfatal) { |
535 | av_log(h, AV_LOG_WARNING, "Circular buffer overrun. " |
536 | "Surviving due to overrun_nonfatal option\n"); |
537 | continue; |
538 | } else { |
539 | av_log(h, AV_LOG_ERROR, "Circular buffer overrun. " |
540 | "To avoid, increase fifo_size URL option. " |
541 | "To survive in such case, use overrun_nonfatal option\n"); |
542 | s->circular_buffer_error = AVERROR(EIO); |
543 | goto end; |
544 | } |
545 | } |
546 | av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL); |
547 | pthread_cond_signal(&s->cond); |
548 | } |
549 | |
550 | end: |
551 | pthread_cond_signal(&s->cond); |
552 | pthread_mutex_unlock(&s->mutex); |
553 | return NULL; |
554 | } |
555 | |
556 | static void *circular_buffer_task_tx( void *_URLContext) |
557 | { |
558 | URLContext *h = _URLContext; |
559 | UDPContext *s = h->priv_data; |
560 | int old_cancelstate; |
561 | int64_t target_timestamp = av_gettime_relative(); |
562 | int64_t start_timestamp = av_gettime_relative(); |
563 | int64_t sent_bits = 0; |
564 | int64_t burst_interval = s->bitrate ? (s->burst_bits * 1000000 / s->bitrate) : 0; |
565 | int64_t max_delay = s->bitrate ? ((int64_t)h->max_packet_size * 8 * 1000000 / s->bitrate + 1) : 0; |
566 | |
567 | pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate); |
568 | pthread_mutex_lock(&s->mutex); |
569 | |
570 | if (ff_socket_nonblock(s->udp_fd, 0) < 0) { |
571 | av_log(h, AV_LOG_ERROR, "Failed to set blocking mode"); |
572 | s->circular_buffer_error = AVERROR(EIO); |
573 | goto end; |
574 | } |
575 | |
576 | for(;;) { |
577 | int len; |
578 | const uint8_t *p; |
579 | uint8_t tmp[4]; |
580 | int64_t timestamp; |
581 | |
582 | len=av_fifo_size(s->fifo); |
583 | |
584 | while (len<4) { |
585 | if (s->close_req) |
586 | goto end; |
587 | if (pthread_cond_wait(&s->cond, &s->mutex) < 0) { |
588 | goto end; |
589 | } |
590 | len=av_fifo_size(s->fifo); |
591 | } |
592 | |
593 | av_fifo_generic_read(s->fifo, tmp, 4, NULL); |
594 | len=AV_RL32(tmp); |
595 | |
596 | av_assert0(len >= 0); |
597 | av_assert0(len <= sizeof(s->tmp)); |
598 | |
599 | av_fifo_generic_read(s->fifo, s->tmp, len, NULL); |
600 | |
601 | pthread_mutex_unlock(&s->mutex); |
602 | pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate); |
603 | |
604 | if (s->bitrate) { |
605 | timestamp = av_gettime_relative(); |
606 | if (timestamp < target_timestamp) { |
607 | int64_t delay = target_timestamp - timestamp; |
608 | if (delay > max_delay) { |
609 | delay = max_delay; |
610 | start_timestamp = timestamp + delay; |
611 | sent_bits = 0; |
612 | } |
613 | av_usleep(delay); |
614 | } else { |
615 | if (timestamp - burst_interval > target_timestamp) { |
616 | start_timestamp = timestamp - burst_interval; |
617 | sent_bits = 0; |
618 | } |
619 | } |
620 | sent_bits += len * 8; |
621 | target_timestamp = start_timestamp + sent_bits * 1000000 / s->bitrate; |
622 | } |
623 | |
624 | p = s->tmp; |
625 | while (len) { |
626 | int ret; |
627 | av_assert0(len > 0); |
628 | if (!s->is_connected) { |
629 | ret = sendto (s->udp_fd, p, len, 0, |
630 | (struct sockaddr *) &s->dest_addr, |
631 | s->dest_addr_len); |
632 | } else |
633 | ret = send(s->udp_fd, p, len, 0); |
634 | if (ret >= 0) { |
635 | len -= ret; |
636 | p += ret; |
637 | } else { |
638 | ret = ff_neterrno(); |
639 | if (ret != AVERROR(EAGAIN) && ret != AVERROR(EINTR)) { |
640 | pthread_mutex_lock(&s->mutex); |
641 | s->circular_buffer_error = ret; |
642 | pthread_mutex_unlock(&s->mutex); |
643 | return NULL; |
644 | } |
645 | } |
646 | } |
647 | |
648 | pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate); |
649 | pthread_mutex_lock(&s->mutex); |
650 | } |
651 | |
652 | end: |
653 | pthread_mutex_unlock(&s->mutex); |
654 | return NULL; |
655 | } |
656 | |
657 | |
658 | #endif |
659 | |
660 | static int parse_source_list(char *buf, char **sources, int *num_sources, |
661 | int max_sources) |
662 | { |
663 | char *source_start; |
664 | |
665 | source_start = buf; |
666 | while (1) { |
667 | char *next = strchr(source_start, ','); |
668 | if (next) |
669 | *next = '\0'; |
670 | sources[*num_sources] = av_strdup(source_start); |
671 | if (!sources[*num_sources]) |
672 | return AVERROR(ENOMEM); |
673 | source_start = next + 1; |
674 | (*num_sources)++; |
675 | if (*num_sources >= max_sources || !next) |
676 | break; |
677 | } |
678 | return 0; |
679 | } |
680 | |
681 | /* put it in UDP context */ |
682 | /* return non zero if error */ |
683 | static int udp_open(URLContext *h, const char *uri, int flags) |
684 | { |
685 | char hostname[1024], localaddr[1024] = ""; |
686 | int port, udp_fd = -1, tmp, bind_ret = -1, dscp = -1; |
687 | UDPContext *s = h->priv_data; |
688 | int is_output; |
689 | const char *p; |
690 | char buf[256]; |
691 | struct sockaddr_storage my_addr; |
692 | socklen_t len; |
693 | int i, num_include_sources = 0, num_exclude_sources = 0; |
694 | char *include_sources[32], *exclude_sources[32]; |
695 | |
696 | h->is_streamed = 1; |
697 | |
698 | is_output = !(flags & AVIO_FLAG_READ); |
699 | if (s->buffer_size < 0) |
700 | s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_MAX_PKT_SIZE; |
701 | |
702 | if (s->sources) { |
703 | if (parse_source_list(s->sources, include_sources, |
704 | &num_include_sources, |
705 | FF_ARRAY_ELEMS(include_sources))) |
706 | goto fail; |
707 | } |
708 | |
709 | if (s->block) { |
710 | if (parse_source_list(s->block, exclude_sources, &num_exclude_sources, |
711 | FF_ARRAY_ELEMS(exclude_sources))) |
712 | goto fail; |
713 | } |
714 | |
715 | if (s->pkt_size > 0) |
716 | h->max_packet_size = s->pkt_size; |
717 | |
718 | p = strchr(uri, '?'); |
719 | if (p) { |
720 | if (av_find_info_tag(buf, sizeof(buf), "reuse", p)) { |
721 | char *endptr = NULL; |
722 | s->reuse_socket = strtol(buf, &endptr, 10); |
723 | /* assume if no digits were found it is a request to enable it */ |
724 | if (buf == endptr) |
725 | s->reuse_socket = 1; |
726 | } |
727 | if (av_find_info_tag(buf, sizeof(buf), "overrun_nonfatal", p)) { |
728 | char *endptr = NULL; |
729 | s->overrun_nonfatal = strtol(buf, &endptr, 10); |
730 | /* assume if no digits were found it is a request to enable it */ |
731 | if (buf == endptr) |
732 | s->overrun_nonfatal = 1; |
733 | if (!HAVE_PTHREAD_CANCEL) |
734 | av_log(h, AV_LOG_WARNING, |
735 | "'overrun_nonfatal' option was set but it is not supported " |
736 | "on this build (pthread support is required)\n"); |
737 | } |
738 | if (av_find_info_tag(buf, sizeof(buf), "ttl", p)) { |
739 | s->ttl = strtol(buf, NULL, 10); |
740 | } |
741 | if (av_find_info_tag(buf, sizeof(buf), "udplite_coverage", p)) { |
742 | s->udplite_coverage = strtol(buf, NULL, 10); |
743 | } |
744 | if (av_find_info_tag(buf, sizeof(buf), "localport", p)) { |
745 | s->local_port = strtol(buf, NULL, 10); |
746 | } |
747 | if (av_find_info_tag(buf, sizeof(buf), "pkt_size", p)) { |
748 | s->pkt_size = strtol(buf, NULL, 10); |
749 | } |
750 | if (av_find_info_tag(buf, sizeof(buf), "buffer_size", p)) { |
751 | s->buffer_size = strtol(buf, NULL, 10); |
752 | } |
753 | if (av_find_info_tag(buf, sizeof(buf), "connect", p)) { |
754 | s->is_connected = strtol(buf, NULL, 10); |
755 | } |
756 | if (av_find_info_tag(buf, sizeof(buf), "dscp", p)) { |
757 | dscp = strtol(buf, NULL, 10); |
758 | } |
759 | if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) { |
760 | s->circular_buffer_size = strtol(buf, NULL, 10); |
761 | if (!HAVE_PTHREAD_CANCEL) |
762 | av_log(h, AV_LOG_WARNING, |
763 | "'circular_buffer_size' option was set but it is not supported " |
764 | "on this build (pthread support is required)\n"); |
765 | } |
766 | if (av_find_info_tag(buf, sizeof(buf), "bitrate", p)) { |
767 | s->bitrate = strtoll(buf, NULL, 10); |
768 | if (!HAVE_PTHREAD_CANCEL) |
769 | av_log(h, AV_LOG_WARNING, |
770 | "'bitrate' option was set but it is not supported " |
771 | "on this build (pthread support is required)\n"); |
772 | } |
773 | if (av_find_info_tag(buf, sizeof(buf), "burst_bits", p)) { |
774 | s->burst_bits = strtoll(buf, NULL, 10); |
775 | } |
776 | if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) { |
777 | av_strlcpy(localaddr, buf, sizeof(localaddr)); |
778 | } |
779 | if (av_find_info_tag(buf, sizeof(buf), "sources", p)) { |
780 | if (parse_source_list(buf, include_sources, &num_include_sources, |
781 | FF_ARRAY_ELEMS(include_sources))) |
782 | goto fail; |
783 | } |
784 | if (av_find_info_tag(buf, sizeof(buf), "block", p)) { |
785 | if (parse_source_list(buf, exclude_sources, &num_exclude_sources, |
786 | FF_ARRAY_ELEMS(exclude_sources))) |
787 | goto fail; |
788 | } |
789 | if (!is_output && av_find_info_tag(buf, sizeof(buf), "timeout", p)) |
790 | s->timeout = strtol(buf, NULL, 10); |
791 | if (is_output && av_find_info_tag(buf, sizeof(buf), "broadcast", p)) |
792 | s->is_broadcast = strtol(buf, NULL, 10); |
793 | } |
794 | /* handling needed to support options picking from both AVOption and URL */ |
795 | s->circular_buffer_size *= 188; |
796 | if (flags & AVIO_FLAG_WRITE) { |
797 | h->max_packet_size = s->pkt_size; |
798 | } else { |
799 | h->max_packet_size = UDP_MAX_PKT_SIZE; |
800 | } |
801 | h->rw_timeout = s->timeout; |
802 | |
803 | /* fill the dest addr */ |
804 | av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri); |
805 | |
806 | /* XXX: fix av_url_split */ |
807 | if (hostname[0] == '\0' || hostname[0] == '?') { |
808 | /* only accepts null hostname if input */ |
809 | if (!(flags & AVIO_FLAG_READ)) |
810 | goto fail; |
811 | } else { |
812 | if (ff_udp_set_remote_url(h, uri) < 0) |
813 | goto fail; |
814 | } |
815 | |
816 | if ((s->is_multicast || s->local_port <= 0) && (h->flags & AVIO_FLAG_READ)) |
817 | s->local_port = port; |
818 | |
819 | if (localaddr[0]) |
820 | udp_fd = udp_socket_create(h, &my_addr, &len, localaddr); |
821 | else |
822 | udp_fd = udp_socket_create(h, &my_addr, &len, s->localaddr); |
823 | if (udp_fd < 0) |
824 | goto fail; |
825 | |
826 | s->local_addr_storage=my_addr; //store for future multicast join |
827 | |
828 | /* Follow the requested reuse option, unless it's multicast in which |
829 | * case enable reuse unless explicitly disabled. |
830 | */ |
831 | if (s->reuse_socket > 0 || (s->is_multicast && s->reuse_socket < 0)) { |
832 | s->reuse_socket = 1; |
833 | if (setsockopt (udp_fd, SOL_SOCKET, SO_REUSEADDR, &(s->reuse_socket), sizeof(s->reuse_socket)) != 0) |
834 | goto fail; |
835 | } |
836 | |
837 | if (s->is_broadcast) { |
838 | #ifdef SO_BROADCAST |
839 | if (setsockopt (udp_fd, SOL_SOCKET, SO_BROADCAST, &(s->is_broadcast), sizeof(s->is_broadcast)) != 0) |
840 | #endif |
841 | goto fail; |
842 | } |
843 | |
844 | /* Set the checksum coverage for UDP-Lite (RFC 3828) for sending and receiving. |
845 | * The receiver coverage has to be less than or equal to the sender coverage. |
846 | * Otherwise, the receiver will drop all packets. |
847 | */ |
848 | if (s->udplite_coverage) { |
849 | if (setsockopt (udp_fd, IPPROTO_UDPLITE, UDPLITE_SEND_CSCOV, &(s->udplite_coverage), sizeof(s->udplite_coverage)) != 0) |
850 | av_log(h, AV_LOG_WARNING, "socket option UDPLITE_SEND_CSCOV not available"); |
851 | |
852 | if (setsockopt (udp_fd, IPPROTO_UDPLITE, UDPLITE_RECV_CSCOV, &(s->udplite_coverage), sizeof(s->udplite_coverage)) != 0) |
853 | av_log(h, AV_LOG_WARNING, "socket option UDPLITE_RECV_CSCOV not available"); |
854 | } |
855 | |
856 | if (dscp >= 0) { |
857 | dscp <<= 2; |
858 | if (setsockopt (udp_fd, IPPROTO_IP, IP_TOS, &dscp, sizeof(dscp)) != 0) |
859 | goto fail; |
860 | } |
861 | |
862 | /* If multicast, try binding the multicast address first, to avoid |
863 | * receiving UDP packets from other sources aimed at the same UDP |
864 | * port. This fails on windows. This makes sending to the same address |
865 | * using sendto() fail, so only do it if we're opened in read-only mode. */ |
866 | if (s->is_multicast && !(h->flags & AVIO_FLAG_WRITE)) { |
867 | bind_ret = bind(udp_fd,(struct sockaddr *)&s->dest_addr, len); |
868 | } |
869 | /* bind to the local address if not multicast or if the multicast |
870 | * bind failed */ |
871 | /* the bind is needed to give a port to the socket now */ |
872 | if (bind_ret < 0 && bind(udp_fd,(struct sockaddr *)&my_addr, len) < 0) { |
873 | log_net_error(h, AV_LOG_ERROR, "bind failed"); |
874 | goto fail; |
875 | } |
876 | |
877 | len = sizeof(my_addr); |
878 | getsockname(udp_fd, (struct sockaddr *)&my_addr, &len); |
879 | s->local_port = udp_port(&my_addr, len); |
880 | |
881 | if (s->is_multicast) { |
882 | if (h->flags & AVIO_FLAG_WRITE) { |
883 | /* output */ |
884 | if (udp_set_multicast_ttl(udp_fd, s->ttl, (struct sockaddr *)&s->dest_addr) < 0) |
885 | goto fail; |
886 | } |
887 | if (h->flags & AVIO_FLAG_READ) { |
888 | /* input */ |
889 | if (num_include_sources && num_exclude_sources) { |
890 | av_log(h, AV_LOG_ERROR, "Simultaneously including and excluding multicast sources is not supported\n"); |
891 | goto fail; |
892 | } |
893 | if (num_include_sources) { |
894 | if (udp_set_multicast_sources(h, udp_fd, |
895 | (struct sockaddr *)&s->dest_addr, |
896 | s->dest_addr_len, |
897 | include_sources, |
898 | num_include_sources, 1) < 0) |
899 | goto fail; |
900 | } else { |
901 | if (udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage) < 0) |
902 | goto fail; |
903 | } |
904 | if (num_exclude_sources) { |
905 | if (udp_set_multicast_sources(h, udp_fd, |
906 | (struct sockaddr *)&s->dest_addr, |
907 | s->dest_addr_len, |
908 | exclude_sources, |
909 | num_exclude_sources, 0) < 0) |
910 | goto fail; |
911 | } |
912 | } |
913 | } |
914 | |
915 | if (is_output) { |
916 | /* limit the tx buf size to limit latency */ |
917 | tmp = s->buffer_size; |
918 | if (setsockopt(udp_fd, SOL_SOCKET, SO_SNDBUF, &tmp, sizeof(tmp)) < 0) { |
919 | log_net_error(h, AV_LOG_ERROR, "setsockopt(SO_SNDBUF)"); |
920 | goto fail; |
921 | } |
922 | } else { |
923 | /* set udp recv buffer size to the requested value (default 64K) */ |
924 | tmp = s->buffer_size; |
925 | if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, sizeof(tmp)) < 0) { |
926 | log_net_error(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF)"); |
927 | } |
928 | len = sizeof(tmp); |
929 | if (getsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, &len) < 0) { |
930 | log_net_error(h, AV_LOG_WARNING, "getsockopt(SO_RCVBUF)"); |
931 | } else { |
932 | av_log(h, AV_LOG_DEBUG, "end receive buffer size reported is %d\n", tmp); |
933 | if(tmp < s->buffer_size) |
934 | av_log(h, AV_LOG_WARNING, "attempted to set receive buffer to size %d but it only ended up set as %d", s->buffer_size, tmp); |
935 | } |
936 | |
937 | /* make the socket non-blocking */ |
938 | ff_socket_nonblock(udp_fd, 1); |
939 | } |
940 | if (s->is_connected) { |
941 | if (connect(udp_fd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len)) { |
942 | log_net_error(h, AV_LOG_ERROR, "connect"); |
943 | goto fail; |
944 | } |
945 | } |
946 | |
947 | for (i = 0; i < num_include_sources; i++) |
948 | av_freep(&include_sources[i]); |
949 | for (i = 0; i < num_exclude_sources; i++) |
950 | av_freep(&exclude_sources[i]); |
951 | |
952 | s->udp_fd = udp_fd; |
953 | |
954 | #if HAVE_PTHREAD_CANCEL |
955 | /* |
956 | Create thread in case of: |
957 | 1. Input and circular_buffer_size is set |
958 | 2. Output and bitrate and circular_buffer_size is set |
959 | */ |
960 | |
961 | if (is_output && s->bitrate && !s->circular_buffer_size) { |
962 | /* Warn user in case of 'circular_buffer_size' is not set */ |
963 | av_log(h, AV_LOG_WARNING,"'bitrate' option was set but 'circular_buffer_size' is not, but required\n"); |
964 | } |
965 | |
966 | if ((!is_output && s->circular_buffer_size) || (is_output && s->bitrate && s->circular_buffer_size)) { |
967 | int ret; |
968 | |
969 | /* start the task going */ |
970 | s->fifo = av_fifo_alloc(s->circular_buffer_size); |
971 | ret = pthread_mutex_init(&s->mutex, NULL); |
972 | if (ret != 0) { |
973 | av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret)); |
974 | goto fail; |
975 | } |
976 | ret = pthread_cond_init(&s->cond, NULL); |
977 | if (ret != 0) { |
978 | av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret)); |
979 | goto cond_fail; |
980 | } |
981 | ret = pthread_create(&s->circular_buffer_thread, NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, h); |
982 | if (ret != 0) { |
983 | av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret)); |
984 | goto thread_fail; |
985 | } |
986 | s->thread_started = 1; |
987 | } |
988 | #endif |
989 | |
990 | return 0; |
991 | #if HAVE_PTHREAD_CANCEL |
992 | thread_fail: |
993 | pthread_cond_destroy(&s->cond); |
994 | cond_fail: |
995 | pthread_mutex_destroy(&s->mutex); |
996 | #endif |
997 | fail: |
998 | if (udp_fd >= 0) |
999 | closesocket(udp_fd); |
1000 | av_fifo_freep(&s->fifo); |
1001 | for (i = 0; i < num_include_sources; i++) |
1002 | av_freep(&include_sources[i]); |
1003 | for (i = 0; i < num_exclude_sources; i++) |
1004 | av_freep(&exclude_sources[i]); |
1005 | return AVERROR(EIO); |
1006 | } |
1007 | |
1008 | static int udplite_open(URLContext *h, const char *uri, int flags) |
1009 | { |
1010 | UDPContext *s = h->priv_data; |
1011 | |
1012 | // set default checksum coverage |
1013 | s->udplite_coverage = UDP_HEADER_SIZE; |
1014 | |
1015 | return udp_open(h, uri, flags); |
1016 | } |
1017 | |
1018 | static int udp_read(URLContext *h, uint8_t *buf, int size) |
1019 | { |
1020 | UDPContext *s = h->priv_data; |
1021 | int ret; |
1022 | #if HAVE_PTHREAD_CANCEL |
1023 | int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK; |
1024 | |
1025 | if (s->fifo) { |
1026 | pthread_mutex_lock(&s->mutex); |
1027 | do { |
1028 | avail = av_fifo_size(s->fifo); |
1029 | if (avail) { // >=size) { |
1030 | uint8_t tmp[4]; |
1031 | |
1032 | av_fifo_generic_read(s->fifo, tmp, 4, NULL); |
1033 | avail= AV_RL32(tmp); |
1034 | if(avail > size){ |
1035 | av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n"); |
1036 | avail= size; |
1037 | } |
1038 | |
1039 | av_fifo_generic_read(s->fifo, buf, avail, NULL); |
1040 | av_fifo_drain(s->fifo, AV_RL32(tmp) - avail); |
1041 | pthread_mutex_unlock(&s->mutex); |
1042 | return avail; |
1043 | } else if(s->circular_buffer_error){ |
1044 | int err = s->circular_buffer_error; |
1045 | pthread_mutex_unlock(&s->mutex); |
1046 | return err; |
1047 | } else if(nonblock) { |
1048 | pthread_mutex_unlock(&s->mutex); |
1049 | return AVERROR(EAGAIN); |
1050 | } |
1051 | else { |
1052 | /* FIXME: using the monotonic clock would be better, |
1053 | but it does not exist on all supported platforms. */ |
1054 | int64_t t = av_gettime() + 100000; |
1055 | struct timespec tv = { .tv_sec = t / 1000000, |
1056 | .tv_nsec = (t % 1000000) * 1000 }; |
1057 | if (pthread_cond_timedwait(&s->cond, &s->mutex, &tv) < 0) { |
1058 | pthread_mutex_unlock(&s->mutex); |
1059 | return AVERROR(errno == ETIMEDOUT ? EAGAIN : errno); |
1060 | } |
1061 | nonblock = 1; |
1062 | } |
1063 | } while( 1); |
1064 | } |
1065 | #endif |
1066 | |
1067 | if (!(h->flags & AVIO_FLAG_NONBLOCK)) { |
1068 | ret = ff_network_wait_fd(s->udp_fd, 0); |
1069 | if (ret < 0) |
1070 | return ret; |
1071 | } |
1072 | ret = recv(s->udp_fd, buf, size, 0); |
1073 | |
1074 | return ret < 0 ? ff_neterrno() : ret; |
1075 | } |
1076 | |
1077 | static int udp_write(URLContext *h, const uint8_t *buf, int size) |
1078 | { |
1079 | UDPContext *s = h->priv_data; |
1080 | int ret; |
1081 | |
1082 | #if HAVE_PTHREAD_CANCEL |
1083 | if (s->fifo) { |
1084 | uint8_t tmp[4]; |
1085 | |
1086 | pthread_mutex_lock(&s->mutex); |
1087 | |
1088 | /* |
1089 | Return error if last tx failed. |
1090 | Here we can't know on which packet error was, but it needs to know that error exists. |
1091 | */ |
1092 | if (s->circular_buffer_error<0) { |
1093 | int err=s->circular_buffer_error; |
1094 | pthread_mutex_unlock(&s->mutex); |
1095 | return err; |
1096 | } |
1097 | |
1098 | if(av_fifo_space(s->fifo) < size + 4) { |
1099 | /* What about a partial packet tx ? */ |
1100 | pthread_mutex_unlock(&s->mutex); |
1101 | return AVERROR(ENOMEM); |
1102 | } |
1103 | AV_WL32(tmp, size); |
1104 | av_fifo_generic_write(s->fifo, tmp, 4, NULL); /* size of packet */ |
1105 | av_fifo_generic_write(s->fifo, (uint8_t *)buf, size, NULL); /* the data */ |
1106 | pthread_cond_signal(&s->cond); |
1107 | pthread_mutex_unlock(&s->mutex); |
1108 | return size; |
1109 | } |
1110 | #endif |
1111 | if (!(h->flags & AVIO_FLAG_NONBLOCK)) { |
1112 | ret = ff_network_wait_fd(s->udp_fd, 1); |
1113 | if (ret < 0) |
1114 | return ret; |
1115 | } |
1116 | |
1117 | if (!s->is_connected) { |
1118 | ret = sendto (s->udp_fd, buf, size, 0, |
1119 | (struct sockaddr *) &s->dest_addr, |
1120 | s->dest_addr_len); |
1121 | } else |
1122 | ret = send(s->udp_fd, buf, size, 0); |
1123 | |
1124 | return ret < 0 ? ff_neterrno() : ret; |
1125 | } |
1126 | |
1127 | static int udp_close(URLContext *h) |
1128 | { |
1129 | UDPContext *s = h->priv_data; |
1130 | |
1131 | #if HAVE_PTHREAD_CANCEL |
1132 | // Request close once writing is finished |
1133 | if (s->thread_started && !(h->flags & AVIO_FLAG_READ)) { |
1134 | pthread_mutex_lock(&s->mutex); |
1135 | s->close_req = 1; |
1136 | pthread_cond_signal(&s->cond); |
1137 | pthread_mutex_unlock(&s->mutex); |
1138 | } |
1139 | #endif |
1140 | |
1141 | if (s->is_multicast && (h->flags & AVIO_FLAG_READ)) |
1142 | udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage); |
1143 | #if HAVE_PTHREAD_CANCEL |
1144 | if (s->thread_started) { |
1145 | int ret; |
1146 | // Cancel only read, as write has been signaled as success to the user |
1147 | if (h->flags & AVIO_FLAG_READ) |
1148 | pthread_cancel(s->circular_buffer_thread); |
1149 | ret = pthread_join(s->circular_buffer_thread, NULL); |
1150 | if (ret != 0) |
1151 | av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret)); |
1152 | pthread_mutex_destroy(&s->mutex); |
1153 | pthread_cond_destroy(&s->cond); |
1154 | } |
1155 | #endif |
1156 | closesocket(s->udp_fd); |
1157 | av_fifo_freep(&s->fifo); |
1158 | return 0; |
1159 | } |
1160 | |
1161 | const URLProtocol ff_udp_protocol = { |
1162 | .name = "udp", |
1163 | .url_open = udp_open, |
1164 | .url_read = udp_read, |
1165 | .url_write = udp_write, |
1166 | .url_close = udp_close, |
1167 | .url_get_file_handle = udp_get_file_handle, |
1168 | .priv_data_size = sizeof(UDPContext), |
1169 | .priv_data_class = &udp_class, |
1170 | .flags = URL_PROTOCOL_FLAG_NETWORK, |
1171 | }; |
1172 | |
1173 | const URLProtocol ff_udplite_protocol = { |
1174 | .name = "udplite", |
1175 | .url_open = udplite_open, |
1176 | .url_read = udp_read, |
1177 | .url_write = udp_write, |
1178 | .url_close = udp_close, |
1179 | .url_get_file_handle = udp_get_file_handle, |
1180 | .priv_data_size = sizeof(UDPContext), |
1181 | .priv_data_class = &udplite_context_class, |
1182 | .flags = URL_PROTOCOL_FLAG_NETWORK, |
1183 | }; |
1184 |