summaryrefslogtreecommitdiff
path: root/libavformat/fifo.c (plain)
blob: 2cbe5c56af8f340248e25e25ac74d2e7cd86f33d
1/*
2 * FIFO pseudo-muxer
3 * Copyright (c) 2016 Jan Sebechlebsky
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public License
9 * as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
19 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22#include "libavutil/avassert.h"
23#include "libavutil/opt.h"
24#include "libavutil/time.h"
25#include "libavutil/thread.h"
26#include "libavutil/threadmessage.h"
27#include "avformat.h"
28#include "internal.h"
29
30#define FIFO_DEFAULT_QUEUE_SIZE 60
31#define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS 0
32#define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 seconds
33
34typedef struct FifoContext {
35 const AVClass *class;
36 AVFormatContext *avf;
37
38 char *format;
39 char *format_options_str;
40 AVDictionary *format_options;
41
42 int queue_size;
43 AVThreadMessageQueue *queue;
44
45 pthread_t writer_thread;
46
47 /* Return value of last write_trailer_call */
48 int write_trailer_ret;
49
50 /* Time to wait before next recovery attempt
51 * This can refer to the time in processed stream,
52 * or real time. */
53 int64_t recovery_wait_time;
54
55 /* Maximal number of unsuccessful successive recovery attempts */
56 int max_recovery_attempts;
57
58 /* Whether to attempt recovery from failure */
59 int attempt_recovery;
60
61 /* If >0 stream time will be used when waiting
62 * for the recovery attempt instead of real time */
63 int recovery_wait_streamtime;
64
65 /* If >0 recovery will be attempted regardless of error code
66 * (except AVERROR_EXIT, so exit request is never ignored) */
67 int recover_any_error;
68
69 /* Whether to drop packets in case the queue is full. */
70 int drop_pkts_on_overflow;
71
72 /* Whether to wait for keyframe when recovering
73 * from failure or queue overflow */
74 int restart_with_keyframe;
75
76 pthread_mutex_t overflow_flag_lock;
77 int overflow_flag_lock_initialized;
78 /* Value > 0 signals queue overflow */
79 volatile uint8_t overflow_flag;
80
81} FifoContext;
82
83typedef struct FifoThreadContext {
84 AVFormatContext *avf;
85
86 /* Timestamp of last failure.
87 * This is either pts in case stream time is used,
88 * or microseconds as returned by av_getttime_relative() */
89 int64_t last_recovery_ts;
90
91 /* Number of current recovery process
92 * Value > 0 means we are in recovery process */
93 int recovery_nr;
94
95 /* If > 0 all frames will be dropped until keyframe is received */
96 uint8_t drop_until_keyframe;
97
98 /* Value > 0 means that the previous write_header call was successful
99 * so finalization by calling write_trailer and ff_io_close must be done
100 * before exiting / reinitialization of underlying muxer */
101 uint8_t header_written;
102} FifoThreadContext;
103
104typedef enum FifoMessageType {
105 FIFO_WRITE_HEADER,
106 FIFO_WRITE_PACKET,
107 FIFO_FLUSH_OUTPUT
108} FifoMessageType;
109
110typedef struct FifoMessage {
111 FifoMessageType type;
112 AVPacket pkt;
113} FifoMessage;
114
115static int fifo_thread_write_header(FifoThreadContext *ctx)
116{
117 AVFormatContext *avf = ctx->avf;
118 FifoContext *fifo = avf->priv_data;
119 AVFormatContext *avf2 = fifo->avf;
120 AVDictionary *format_options = NULL;
121 int ret, i;
122
123 ret = av_dict_copy(&format_options, fifo->format_options, 0);
124 if (ret < 0)
125 return ret;
126
127 ret = ff_format_output_open(avf2, avf->filename, &format_options);
128 if (ret < 0) {
129 av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->filename,
130 av_err2str(ret));
131 goto end;
132 }
133
134 for (i = 0;i < avf2->nb_streams; i++)
135 avf2->streams[i]->cur_dts = 0;
136
137 ret = avformat_write_header(avf2, &format_options);
138 if (!ret)
139 ctx->header_written = 1;
140
141 // Check for options unrecognized by underlying muxer
142 if (format_options) {
143 AVDictionaryEntry *entry = NULL;
144 while ((entry = av_dict_get(format_options, "", entry, AV_DICT_IGNORE_SUFFIX)))
145 av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
146 ret = AVERROR(EINVAL);
147 }
148
149end:
150 av_dict_free(&format_options);
151 return ret;
152}
153
154static int fifo_thread_flush_output(FifoThreadContext *ctx)
155{
156 AVFormatContext *avf = ctx->avf;
157 FifoContext *fifo = avf->priv_data;
158 AVFormatContext *avf2 = fifo->avf;
159
160 return av_write_frame(avf2, NULL);
161}
162
163static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
164{
165 AVFormatContext *avf = ctx->avf;
166 FifoContext *fifo = avf->priv_data;
167 AVFormatContext *avf2 = fifo->avf;
168 AVRational src_tb, dst_tb;
169 int ret, s_idx;
170
171 if (ctx->drop_until_keyframe) {
172 if (pkt->flags & AV_PKT_FLAG_KEY) {
173 ctx->drop_until_keyframe = 0;
174 av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n");
175 } else {
176 av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n");
177 av_packet_unref(pkt);
178 return 0;
179 }
180 }
181
182 s_idx = pkt->stream_index;
183 src_tb = avf->streams[s_idx]->time_base;
184 dst_tb = avf2->streams[s_idx]->time_base;
185 av_packet_rescale_ts(pkt, src_tb, dst_tb);
186
187 ret = av_write_frame(avf2, pkt);
188 if (ret >= 0)
189 av_packet_unref(pkt);
190 return ret;
191}
192
193static int fifo_thread_write_trailer(FifoThreadContext *ctx)
194{
195 AVFormatContext *avf = ctx->avf;
196 FifoContext *fifo = avf->priv_data;
197 AVFormatContext *avf2 = fifo->avf;
198 int ret;
199
200 if (!ctx->header_written)
201 return 0;
202
203 ret = av_write_trailer(avf2);
204 ff_format_io_close(avf2, &avf2->pb);
205
206 return ret;
207}
208
209static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg)
210{
211 int ret = AVERROR(EINVAL);
212
213 if (!ctx->header_written) {
214 ret = fifo_thread_write_header(ctx);
215 if (ret < 0)
216 return ret;
217 }
218
219 switch(msg->type) {
220 case FIFO_WRITE_HEADER:
221 av_assert0(ret >= 0);
222 return ret;
223 case FIFO_WRITE_PACKET:
224 return fifo_thread_write_packet(ctx, &msg->pkt);
225 case FIFO_FLUSH_OUTPUT:
226 return fifo_thread_flush_output(ctx);
227 }
228
229 av_assert0(0);
230 return AVERROR(EINVAL);
231}
232
233static int is_recoverable(const FifoContext *fifo, int err_no) {
234 if (!fifo->attempt_recovery)
235 return 0;
236
237 if (fifo->recover_any_error)
238 return err_no != AVERROR_EXIT;
239
240 switch (err_no) {
241 case AVERROR(EINVAL):
242 case AVERROR(ENOSYS):
243 case AVERROR_EOF:
244 case AVERROR_EXIT:
245 case AVERROR_PATCHWELCOME:
246 return 0;
247 default:
248 return 1;
249 }
250}
251
252static void free_message(void *msg)
253{
254 FifoMessage *fifo_msg = msg;
255
256 if (fifo_msg->type == FIFO_WRITE_PACKET)
257 av_packet_unref(&fifo_msg->pkt);
258}
259
260static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt,
261 int err_no)
262{
263 AVFormatContext *avf = ctx->avf;
264 FifoContext *fifo = avf->priv_data;
265 int ret;
266
267 av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
268 av_err2str(err_no));
269
270 if (fifo->recovery_wait_streamtime) {
271 if (pkt->pts == AV_NOPTS_VALUE)
272 av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation"
273 " timestamp, recovery will be attempted immediately");
274 ctx->last_recovery_ts = pkt->pts;
275 } else {
276 ctx->last_recovery_ts = av_gettime_relative();
277 }
278
279 if (fifo->max_recovery_attempts &&
280 ctx->recovery_nr >= fifo->max_recovery_attempts) {
281 av_log(avf, AV_LOG_ERROR,
282 "Maximal number of %d recovery attempts reached.\n",
283 fifo->max_recovery_attempts);
284 ret = err_no;
285 } else {
286 ret = AVERROR(EAGAIN);
287 }
288
289 return ret;
290}
291
292static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
293{
294 AVFormatContext *avf = ctx->avf;
295 FifoContext *fifo = avf->priv_data;
296 AVPacket *pkt = &msg->pkt;
297 int64_t time_since_recovery;
298 int ret;
299
300 if (!is_recoverable(fifo, err_no)) {
301 ret = err_no;
302 goto fail;
303 }
304
305 if (ctx->header_written) {
306 fifo->write_trailer_ret = fifo_thread_write_trailer(ctx);
307 ctx->header_written = 0;
308 }
309
310 if (!ctx->recovery_nr) {
311 ctx->last_recovery_ts = fifo->recovery_wait_streamtime ?
312 AV_NOPTS_VALUE : 0;
313 } else {
314 if (fifo->recovery_wait_streamtime) {
315 if (ctx->last_recovery_ts == AV_NOPTS_VALUE) {
316 AVRational tb = avf->streams[pkt->stream_index]->time_base;
317 time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts,
318 tb, AV_TIME_BASE_Q);
319 } else {
320 /* Enforce recovery immediately */
321 time_since_recovery = fifo->recovery_wait_time;
322 }
323 } else {
324 time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
325 }
326
327 if (time_since_recovery < fifo->recovery_wait_time)
328 return AVERROR(EAGAIN);
329 }
330
331 ctx->recovery_nr++;
332
333 if (fifo->max_recovery_attempts) {
334 av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n",
335 ctx->recovery_nr, fifo->max_recovery_attempts);
336 } else {
337 av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n",
338 ctx->recovery_nr);
339 }
340
341 if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow)
342 ctx->drop_until_keyframe = 1;
343
344 ret = fifo_thread_dispatch_message(ctx, msg);
345 if (ret < 0) {
346 if (is_recoverable(fifo, ret)) {
347 return fifo_thread_process_recovery_failure(ctx, pkt, ret);
348 } else {
349 goto fail;
350 }
351 } else {
352 av_log(avf, AV_LOG_INFO, "Recovery successful\n");
353 ctx->recovery_nr = 0;
354 }
355
356 return 0;
357
358fail:
359 free_message(msg);
360 return ret;
361}
362
363static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
364{
365 AVFormatContext *avf = ctx->avf;
366 FifoContext *fifo = avf->priv_data;
367 int ret;
368
369 do {
370 if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) {
371 int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
372 int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);
373 if (time_to_wait)
374 av_usleep(FFMIN(10000, time_to_wait));
375 }
376
377 ret = fifo_thread_attempt_recovery(ctx, msg, err_no);
378 } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow);
379
380 if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) {
381 if (msg->type == FIFO_WRITE_PACKET)
382 av_packet_unref(&msg->pkt);
383 ret = 0;
384 }
385
386 return ret;
387}
388
389static void *fifo_consumer_thread(void *data)
390{
391 AVFormatContext *avf = data;
392 FifoContext *fifo = avf->priv_data;
393 AVThreadMessageQueue *queue = fifo->queue;
394 FifoMessage msg = {FIFO_WRITE_HEADER, {0}};
395 int ret;
396
397 FifoThreadContext fifo_thread_ctx;
398 memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
399 fifo_thread_ctx.avf = avf;
400
401 while (1) {
402 uint8_t just_flushed = 0;
403
404 if (!fifo_thread_ctx.recovery_nr)
405 ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
406
407 if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
408 int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
409 if (rec_ret < 0) {
410 av_thread_message_queue_set_err_send(queue, rec_ret);
411 break;
412 }
413 }
414
415 /* If the queue is full at the moment when fifo_write_packet
416 * attempts to insert new message (packet) to the queue,
417 * it sets the fifo->overflow_flag to 1 and drops packet.
418 * Here in consumer thread, the flag is checked and if it is
419 * set, the queue is flushed and flag cleared. */
420 pthread_mutex_lock(&fifo->overflow_flag_lock);
421 if (fifo->overflow_flag) {
422 av_thread_message_flush(queue);
423 if (fifo->restart_with_keyframe)
424 fifo_thread_ctx.drop_until_keyframe = 1;
425 fifo->overflow_flag = 0;
426 just_flushed = 1;
427 }
428 pthread_mutex_unlock(&fifo->overflow_flag_lock);
429
430 if (just_flushed)
431 av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
432
433 ret = av_thread_message_queue_recv(queue, &msg, 0);
434 if (ret < 0) {
435 av_thread_message_queue_set_err_send(queue, ret);
436 break;
437 }
438 }
439
440 fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
441
442 return NULL;
443}
444
445static int fifo_mux_init(AVFormatContext *avf, AVOutputFormat *oformat)
446{
447 FifoContext *fifo = avf->priv_data;
448 AVFormatContext *avf2;
449 int ret = 0, i;
450
451 ret = avformat_alloc_output_context2(&avf2, oformat, NULL, NULL);
452 if (ret < 0)
453 return ret;
454
455 fifo->avf = avf2;
456
457 avf2->interrupt_callback = avf->interrupt_callback;
458 avf2->max_delay = avf->max_delay;
459 ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
460 if (ret < 0)
461 return ret;
462 avf2->opaque = avf->opaque;
463 avf2->io_close = avf->io_close;
464 avf2->io_open = avf->io_open;
465 avf2->flags = avf->flags;
466
467 for (i = 0; i < avf->nb_streams; ++i) {
468 AVStream *st = avformat_new_stream(avf2, NULL);
469 if (!st)
470 return AVERROR(ENOMEM);
471
472 ret = ff_stream_encode_params_copy(st, avf->streams[i]);
473 if (ret < 0)
474 return ret;
475 }
476
477 return 0;
478}
479
480static int fifo_init(AVFormatContext *avf)
481{
482 FifoContext *fifo = avf->priv_data;
483 AVOutputFormat *oformat;
484 int ret = 0;
485
486 if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
487 av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
488 " only when drop_pkts_on_overflow is also turned on\n");
489 return AVERROR(EINVAL);
490 }
491
492 if (fifo->format_options_str) {
493 ret = av_dict_parse_string(&fifo->format_options, fifo->format_options_str,
494 "=", ":", 0);
495 if (ret < 0) {
496 av_log(avf, AV_LOG_ERROR, "Could not parse format options list '%s'\n",
497 fifo->format_options_str);
498 return ret;
499 }
500 }
501
502 oformat = av_guess_format(fifo->format, avf->filename, NULL);
503 if (!oformat) {
504 ret = AVERROR_MUXER_NOT_FOUND;
505 return ret;
506 }
507
508 ret = fifo_mux_init(avf, oformat);
509 if (ret < 0)
510 return ret;
511
512 ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
513 sizeof(FifoMessage));
514 if (ret < 0)
515 return ret;
516
517 av_thread_message_queue_set_free_func(fifo->queue, free_message);
518
519 ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL);
520 if (ret < 0)
521 return AVERROR(ret);
522 fifo->overflow_flag_lock_initialized = 1;
523
524 return 0;
525}
526
527static int fifo_write_header(AVFormatContext *avf)
528{
529 FifoContext * fifo = avf->priv_data;
530 int ret;
531
532 ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf);
533 if (ret) {
534 av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
535 av_err2str(AVERROR(ret)));
536 ret = AVERROR(ret);
537 }
538
539 return ret;
540}
541
542static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
543{
544 FifoContext *fifo = avf->priv_data;
545 FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
546 int ret;
547
548 if (pkt) {
549 av_init_packet(&msg.pkt);
550 ret = av_packet_ref(&msg.pkt,pkt);
551 if (ret < 0)
552 return ret;
553 }
554
555 ret = av_thread_message_queue_send(fifo->queue, &msg,
556 fifo->drop_pkts_on_overflow ?
557 AV_THREAD_MESSAGE_NONBLOCK : 0);
558 if (ret == AVERROR(EAGAIN)) {
559 uint8_t overflow_set = 0;
560
561 /* Queue is full, set fifo->overflow_flag to 1
562 * to let consumer thread know the queue should
563 * be flushed. */
564 pthread_mutex_lock(&fifo->overflow_flag_lock);
565 if (!fifo->overflow_flag)
566 fifo->overflow_flag = overflow_set = 1;
567 pthread_mutex_unlock(&fifo->overflow_flag_lock);
568
569 if (overflow_set)
570 av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
571 ret = 0;
572 goto fail;
573 } else if (ret < 0) {
574 goto fail;
575 }
576
577 return ret;
578fail:
579 if (pkt)
580 av_packet_unref(&msg.pkt);
581 return ret;
582}
583
584static int fifo_write_trailer(AVFormatContext *avf)
585{
586 FifoContext *fifo= avf->priv_data;
587 int ret;
588
589 av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
590
591 ret = pthread_join(fifo->writer_thread, NULL);
592 if (ret < 0) {
593 av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
594 av_err2str(AVERROR(ret)));
595 return AVERROR(ret);
596 }
597
598 ret = fifo->write_trailer_ret;
599 return ret;
600}
601
602static void fifo_deinit(AVFormatContext *avf)
603{
604 FifoContext *fifo = avf->priv_data;
605
606 av_dict_free(&fifo->format_options);
607 avformat_free_context(fifo->avf);
608 av_thread_message_queue_free(&fifo->queue);
609 if (fifo->overflow_flag_lock_initialized)
610 pthread_mutex_destroy(&fifo->overflow_flag_lock);
611}
612
613#define OFFSET(x) offsetof(FifoContext, x)
614static const AVOption options[] = {
615 {"fifo_format", "Target muxer", OFFSET(format),
616 AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
617
618 {"queue_size", "Size of fifo queue", OFFSET(queue_size),
619 AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
620
621 {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options_str),
622 AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
623
624 {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow),
625 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
626
627 {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
628 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
629
630 {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
631 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
632
633 {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
634 AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
635
636 {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time),
637 AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
638
639 {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
640 OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
641
642 {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
643 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
644
645 {NULL},
646};
647
648static const AVClass fifo_muxer_class = {
649 .class_name = "Fifo muxer",
650 .item_name = av_default_item_name,
651 .option = options,
652 .version = LIBAVUTIL_VERSION_INT,
653};
654
655AVOutputFormat ff_fifo_muxer = {
656 .name = "fifo",
657 .long_name = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
658 .priv_data_size = sizeof(FifoContext),
659 .init = fifo_init,
660 .write_header = fifo_write_header,
661 .write_packet = fifo_write_packet,
662 .write_trailer = fifo_write_trailer,
663 .deinit = fifo_deinit,
664 .priv_class = &fifo_muxer_class,
665 .flags = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE,
666};
667