blob: 0ba87b5929f4b7f79b4355e02ec1dcc04ea31c64
1 | /* |
2 | * This file is part of FFmpeg. |
3 | * |
4 | * FFmpeg is free software; you can redistribute it and/or |
5 | * modify it under the terms of the GNU Lesser General Public |
6 | * License as published by the Free Software Foundation; either |
7 | * version 2.1 of the License, or (at your option) any later version. |
8 | * |
9 | * FFmpeg is distributed in the hope that it will be useful, |
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
12 | * Lesser General Public License for more details. |
13 | * |
14 | * You should have received a copy of the GNU Lesser General Public |
15 | * License along with FFmpeg; if not, write to the Free Software |
16 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
17 | */ |
18 | |
19 | /** |
20 | * @file |
21 | * Frame multithreading support functions |
22 | * @see doc/multithreading.txt |
23 | */ |
24 | |
25 | #include "config.h" |
26 | |
27 | #include <stdatomic.h> |
28 | #include <stdint.h> |
29 | |
30 | #include "avcodec.h" |
31 | #include "hwaccel.h" |
32 | #include "internal.h" |
33 | #include "pthread_internal.h" |
34 | #include "thread.h" |
35 | #include "version.h" |
36 | |
37 | #include "libavutil/avassert.h" |
38 | #include "libavutil/buffer.h" |
39 | #include "libavutil/common.h" |
40 | #include "libavutil/cpu.h" |
41 | #include "libavutil/frame.h" |
42 | #include "libavutil/internal.h" |
43 | #include "libavutil/log.h" |
44 | #include "libavutil/mem.h" |
45 | #include "libavutil/opt.h" |
46 | #include "libavutil/thread.h" |
47 | |
48 | enum { |
49 | ///< Set when the thread is awaiting a packet. |
50 | STATE_INPUT_READY, |
51 | ///< Set before the codec has called ff_thread_finish_setup(). |
52 | STATE_SETTING_UP, |
53 | /** |
54 | * Set when the codec calls get_buffer(). |
55 | * State is returned to STATE_SETTING_UP afterwards. |
56 | */ |
57 | STATE_GET_BUFFER, |
58 | /** |
59 | * Set when the codec calls get_format(). |
60 | * State is returned to STATE_SETTING_UP afterwards. |
61 | */ |
62 | STATE_GET_FORMAT, |
63 | ///< Set after the codec has called ff_thread_finish_setup(). |
64 | STATE_SETUP_FINISHED, |
65 | }; |
66 | |
67 | /** |
68 | * Context used by codec threads and stored in their AVCodecInternal thread_ctx. |
69 | */ |
70 | typedef struct PerThreadContext { |
71 | struct FrameThreadContext *parent; |
72 | |
73 | pthread_t thread; |
74 | int thread_init; |
75 | pthread_cond_t input_cond; ///< Used to wait for a new packet from the main thread. |
76 | pthread_cond_t progress_cond; ///< Used by child threads to wait for progress to change. |
77 | pthread_cond_t output_cond; ///< Used by the main thread to wait for frames to finish. |
78 | |
79 | pthread_mutex_t mutex; ///< Mutex used to protect the contents of the PerThreadContext. |
80 | pthread_mutex_t progress_mutex; ///< Mutex used to protect frame progress values and progress_cond. |
81 | |
82 | AVCodecContext *avctx; ///< Context used to decode packets passed to this thread. |
83 | |
84 | AVPacket avpkt; ///< Input packet (for decoding) or output (for encoding). |
85 | |
86 | AVFrame *frame; ///< Output frame (for decoding) or input (for encoding). |
87 | int got_frame; ///< The output of got_picture_ptr from the last avcodec_decode_video() call. |
88 | int result; ///< The result of the last codec decode/encode() call. |
89 | |
90 | atomic_int state; |
91 | |
92 | /** |
93 | * Array of frames passed to ff_thread_release_buffer(). |
94 | * Frames are released after all threads referencing them are finished. |
95 | */ |
96 | AVFrame *released_buffers; |
97 | int num_released_buffers; |
98 | int released_buffers_allocated; |
99 | |
100 | AVFrame *requested_frame; ///< AVFrame the codec passed to get_buffer() |
101 | int requested_flags; ///< flags passed to get_buffer() for requested_frame |
102 | |
103 | const enum AVPixelFormat *available_formats; ///< Format array for get_format() |
104 | enum AVPixelFormat result_format; ///< get_format() result |
105 | |
106 | int die; ///< Set when the thread should exit. |
107 | |
108 | int hwaccel_serializing; |
109 | int async_serializing; |
110 | } PerThreadContext; |
111 | |
112 | /** |
113 | * Context stored in the client AVCodecInternal thread_ctx. |
114 | */ |
115 | typedef struct FrameThreadContext { |
116 | PerThreadContext *threads; ///< The contexts for each thread. |
117 | PerThreadContext *prev_thread; ///< The last thread submit_packet() was called on. |
118 | |
119 | pthread_mutex_t buffer_mutex; ///< Mutex used to protect get/release_buffer(). |
120 | /** |
121 | * This lock is used for ensuring threads run in serial when hwaccel |
122 | * is used. |
123 | */ |
124 | pthread_mutex_t hwaccel_mutex; |
125 | pthread_mutex_t async_mutex; |
126 | pthread_cond_t async_cond; |
127 | int async_lock; |
128 | |
129 | int next_decoding; ///< The next context to submit a packet to. |
130 | int next_finished; ///< The next context to return output from. |
131 | |
132 | int delaying; /**< |
133 | * Set for the first N packets, where N is the number of threads. |
134 | * While it is set, ff_thread_en/decode_frame won't return any results. |
135 | */ |
136 | } FrameThreadContext; |
137 | |
138 | #define THREAD_SAFE_CALLBACKS(avctx) \ |
139 | ((avctx)->thread_safe_callbacks || (avctx)->get_buffer2 == avcodec_default_get_buffer2) |
140 | |
141 | static void async_lock(FrameThreadContext *fctx) |
142 | { |
143 | pthread_mutex_lock(&fctx->async_mutex); |
144 | while (fctx->async_lock) |
145 | pthread_cond_wait(&fctx->async_cond, &fctx->async_mutex); |
146 | fctx->async_lock = 1; |
147 | pthread_mutex_unlock(&fctx->async_mutex); |
148 | } |
149 | |
150 | static void async_unlock(FrameThreadContext *fctx) |
151 | { |
152 | pthread_mutex_lock(&fctx->async_mutex); |
153 | av_assert0(fctx->async_lock); |
154 | fctx->async_lock = 0; |
155 | pthread_cond_broadcast(&fctx->async_cond); |
156 | pthread_mutex_unlock(&fctx->async_mutex); |
157 | } |
158 | |
159 | /** |
160 | * Codec worker thread. |
161 | * |
162 | * Automatically calls ff_thread_finish_setup() if the codec does |
163 | * not provide an update_thread_context method, or if the codec returns |
164 | * before calling it. |
165 | */ |
166 | static attribute_align_arg void *frame_worker_thread(void *arg) |
167 | { |
168 | PerThreadContext *p = arg; |
169 | AVCodecContext *avctx = p->avctx; |
170 | const AVCodec *codec = avctx->codec; |
171 | |
172 | pthread_mutex_lock(&p->mutex); |
173 | while (1) { |
174 | while (atomic_load(&p->state) == STATE_INPUT_READY && !p->die) |
175 | pthread_cond_wait(&p->input_cond, &p->mutex); |
176 | |
177 | if (p->die) break; |
178 | |
179 | if (!codec->update_thread_context && THREAD_SAFE_CALLBACKS(avctx)) |
180 | ff_thread_finish_setup(avctx); |
181 | |
182 | /* If a decoder supports hwaccel, then it must call ff_get_format(). |
183 | * Since that call must happen before ff_thread_finish_setup(), the |
184 | * decoder is required to implement update_thread_context() and call |
185 | * ff_thread_finish_setup() manually. Therefore the above |
186 | * ff_thread_finish_setup() call did not happen and hwaccel_serializing |
187 | * cannot be true here. */ |
188 | av_assert0(!p->hwaccel_serializing); |
189 | |
190 | /* if the previous thread uses hwaccel then we take the lock to ensure |
191 | * the threads don't run concurrently */ |
192 | if (avctx->hwaccel) { |
193 | pthread_mutex_lock(&p->parent->hwaccel_mutex); |
194 | p->hwaccel_serializing = 1; |
195 | } |
196 | |
197 | av_frame_unref(p->frame); |
198 | p->got_frame = 0; |
199 | p->result = codec->decode(avctx, p->frame, &p->got_frame, &p->avpkt); |
200 | |
201 | if ((p->result < 0 || !p->got_frame) && p->frame->buf[0]) { |
202 | if (avctx->internal->allocate_progress) |
203 | av_log(avctx, AV_LOG_ERROR, "A frame threaded decoder did not " |
204 | "free the frame on failure. This is a bug, please report it.\n"); |
205 | av_frame_unref(p->frame); |
206 | } |
207 | |
208 | if (atomic_load(&p->state) == STATE_SETTING_UP) |
209 | ff_thread_finish_setup(avctx); |
210 | |
211 | if (p->hwaccel_serializing) { |
212 | p->hwaccel_serializing = 0; |
213 | pthread_mutex_unlock(&p->parent->hwaccel_mutex); |
214 | } |
215 | |
216 | if (p->async_serializing) { |
217 | p->async_serializing = 0; |
218 | |
219 | async_unlock(p->parent); |
220 | } |
221 | |
222 | pthread_mutex_lock(&p->progress_mutex); |
223 | |
224 | atomic_store(&p->state, STATE_INPUT_READY); |
225 | |
226 | pthread_cond_broadcast(&p->progress_cond); |
227 | pthread_cond_signal(&p->output_cond); |
228 | pthread_mutex_unlock(&p->progress_mutex); |
229 | } |
230 | pthread_mutex_unlock(&p->mutex); |
231 | |
232 | return NULL; |
233 | } |
234 | |
235 | /** |
236 | * Update the next thread's AVCodecContext with values from the reference thread's context. |
237 | * |
238 | * @param dst The destination context. |
239 | * @param src The source context. |
240 | * @param for_user 0 if the destination is a codec thread, 1 if the destination is the user's thread |
241 | * @return 0 on success, negative error code on failure |
242 | */ |
243 | static int update_context_from_thread(AVCodecContext *dst, AVCodecContext *src, int for_user) |
244 | { |
245 | int err = 0; |
246 | |
247 | if (dst != src && (for_user || !(av_codec_get_codec_descriptor(src)->props & AV_CODEC_PROP_INTRA_ONLY))) { |
248 | dst->time_base = src->time_base; |
249 | dst->framerate = src->framerate; |
250 | dst->width = src->width; |
251 | dst->height = src->height; |
252 | dst->pix_fmt = src->pix_fmt; |
253 | dst->sw_pix_fmt = src->sw_pix_fmt; |
254 | |
255 | dst->coded_width = src->coded_width; |
256 | dst->coded_height = src->coded_height; |
257 | |
258 | dst->has_b_frames = src->has_b_frames; |
259 | dst->idct_algo = src->idct_algo; |
260 | |
261 | dst->bits_per_coded_sample = src->bits_per_coded_sample; |
262 | dst->sample_aspect_ratio = src->sample_aspect_ratio; |
263 | #if FF_API_AFD |
264 | FF_DISABLE_DEPRECATION_WARNINGS |
265 | dst->dtg_active_format = src->dtg_active_format; |
266 | FF_ENABLE_DEPRECATION_WARNINGS |
267 | #endif /* FF_API_AFD */ |
268 | |
269 | dst->profile = src->profile; |
270 | dst->level = src->level; |
271 | |
272 | dst->bits_per_raw_sample = src->bits_per_raw_sample; |
273 | dst->ticks_per_frame = src->ticks_per_frame; |
274 | dst->color_primaries = src->color_primaries; |
275 | |
276 | dst->color_trc = src->color_trc; |
277 | dst->colorspace = src->colorspace; |
278 | dst->color_range = src->color_range; |
279 | dst->chroma_sample_location = src->chroma_sample_location; |
280 | |
281 | dst->hwaccel = src->hwaccel; |
282 | dst->hwaccel_context = src->hwaccel_context; |
283 | |
284 | dst->channels = src->channels; |
285 | dst->sample_rate = src->sample_rate; |
286 | dst->sample_fmt = src->sample_fmt; |
287 | dst->channel_layout = src->channel_layout; |
288 | dst->internal->hwaccel_priv_data = src->internal->hwaccel_priv_data; |
289 | |
290 | if (!!dst->hw_frames_ctx != !!src->hw_frames_ctx || |
291 | (dst->hw_frames_ctx && dst->hw_frames_ctx->data != src->hw_frames_ctx->data)) { |
292 | av_buffer_unref(&dst->hw_frames_ctx); |
293 | |
294 | if (src->hw_frames_ctx) { |
295 | dst->hw_frames_ctx = av_buffer_ref(src->hw_frames_ctx); |
296 | if (!dst->hw_frames_ctx) |
297 | return AVERROR(ENOMEM); |
298 | } |
299 | } |
300 | |
301 | dst->hwaccel_flags = src->hwaccel_flags; |
302 | } |
303 | |
304 | if (for_user) { |
305 | dst->delay = src->thread_count - 1; |
306 | #if FF_API_CODED_FRAME |
307 | FF_DISABLE_DEPRECATION_WARNINGS |
308 | dst->coded_frame = src->coded_frame; |
309 | FF_ENABLE_DEPRECATION_WARNINGS |
310 | #endif |
311 | } else { |
312 | if (dst->codec->update_thread_context) |
313 | err = dst->codec->update_thread_context(dst, src); |
314 | } |
315 | |
316 | return err; |
317 | } |
318 | |
319 | /** |
320 | * Update the next thread's AVCodecContext with values set by the user. |
321 | * |
322 | * @param dst The destination context. |
323 | * @param src The source context. |
324 | * @return 0 on success, negative error code on failure |
325 | */ |
326 | static int update_context_from_user(AVCodecContext *dst, AVCodecContext *src) |
327 | { |
328 | #define copy_fields(s, e) memcpy(&dst->s, &src->s, (char*)&dst->e - (char*)&dst->s); |
329 | dst->flags = src->flags; |
330 | |
331 | dst->draw_horiz_band= src->draw_horiz_band; |
332 | dst->get_buffer2 = src->get_buffer2; |
333 | |
334 | dst->opaque = src->opaque; |
335 | dst->debug = src->debug; |
336 | dst->debug_mv = src->debug_mv; |
337 | |
338 | dst->slice_flags = src->slice_flags; |
339 | dst->flags2 = src->flags2; |
340 | |
341 | copy_fields(skip_loop_filter, subtitle_header); |
342 | |
343 | dst->frame_number = src->frame_number; |
344 | dst->reordered_opaque = src->reordered_opaque; |
345 | dst->thread_safe_callbacks = src->thread_safe_callbacks; |
346 | |
347 | if (src->slice_count && src->slice_offset) { |
348 | if (dst->slice_count < src->slice_count) { |
349 | int err = av_reallocp_array(&dst->slice_offset, src->slice_count, |
350 | sizeof(*dst->slice_offset)); |
351 | if (err < 0) |
352 | return err; |
353 | } |
354 | memcpy(dst->slice_offset, src->slice_offset, |
355 | src->slice_count * sizeof(*dst->slice_offset)); |
356 | } |
357 | dst->slice_count = src->slice_count; |
358 | return 0; |
359 | #undef copy_fields |
360 | } |
361 | |
362 | /// Releases the buffers that this decoding thread was the last user of. |
363 | static void release_delayed_buffers(PerThreadContext *p) |
364 | { |
365 | FrameThreadContext *fctx = p->parent; |
366 | |
367 | while (p->num_released_buffers > 0) { |
368 | AVFrame *f; |
369 | |
370 | pthread_mutex_lock(&fctx->buffer_mutex); |
371 | |
372 | // fix extended data in case the caller screwed it up |
373 | av_assert0(p->avctx->codec_type == AVMEDIA_TYPE_VIDEO || |
374 | p->avctx->codec_type == AVMEDIA_TYPE_AUDIO); |
375 | f = &p->released_buffers[--p->num_released_buffers]; |
376 | f->extended_data = f->data; |
377 | av_frame_unref(f); |
378 | |
379 | pthread_mutex_unlock(&fctx->buffer_mutex); |
380 | } |
381 | } |
382 | |
383 | static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx, |
384 | AVPacket *avpkt) |
385 | { |
386 | FrameThreadContext *fctx = p->parent; |
387 | PerThreadContext *prev_thread = fctx->prev_thread; |
388 | const AVCodec *codec = p->avctx->codec; |
389 | int ret; |
390 | |
391 | if (!avpkt->size && !(codec->capabilities & AV_CODEC_CAP_DELAY)) |
392 | return 0; |
393 | |
394 | pthread_mutex_lock(&p->mutex); |
395 | |
396 | ret = update_context_from_user(p->avctx, user_avctx); |
397 | if (ret) { |
398 | pthread_mutex_unlock(&p->mutex); |
399 | return ret; |
400 | } |
401 | |
402 | release_delayed_buffers(p); |
403 | |
404 | if (prev_thread) { |
405 | int err; |
406 | if (atomic_load(&prev_thread->state) == STATE_SETTING_UP) { |
407 | pthread_mutex_lock(&prev_thread->progress_mutex); |
408 | while (atomic_load(&prev_thread->state) == STATE_SETTING_UP) |
409 | pthread_cond_wait(&prev_thread->progress_cond, &prev_thread->progress_mutex); |
410 | pthread_mutex_unlock(&prev_thread->progress_mutex); |
411 | } |
412 | |
413 | err = update_context_from_thread(p->avctx, prev_thread->avctx, 0); |
414 | if (err) { |
415 | pthread_mutex_unlock(&p->mutex); |
416 | return err; |
417 | } |
418 | } |
419 | |
420 | av_packet_unref(&p->avpkt); |
421 | ret = av_packet_ref(&p->avpkt, avpkt); |
422 | if (ret < 0) { |
423 | pthread_mutex_unlock(&p->mutex); |
424 | av_log(p->avctx, AV_LOG_ERROR, "av_packet_ref() failed in submit_packet()\n"); |
425 | return ret; |
426 | } |
427 | |
428 | atomic_store(&p->state, STATE_SETTING_UP); |
429 | pthread_cond_signal(&p->input_cond); |
430 | pthread_mutex_unlock(&p->mutex); |
431 | |
432 | /* |
433 | * If the client doesn't have a thread-safe get_buffer(), |
434 | * then decoding threads call back to the main thread, |
435 | * and it calls back to the client here. |
436 | */ |
437 | |
438 | if (!p->avctx->thread_safe_callbacks && ( |
439 | p->avctx->get_format != avcodec_default_get_format || |
440 | p->avctx->get_buffer2 != avcodec_default_get_buffer2)) { |
441 | while (atomic_load(&p->state) != STATE_SETUP_FINISHED && atomic_load(&p->state) != STATE_INPUT_READY) { |
442 | int call_done = 1; |
443 | pthread_mutex_lock(&p->progress_mutex); |
444 | while (atomic_load(&p->state) == STATE_SETTING_UP) |
445 | pthread_cond_wait(&p->progress_cond, &p->progress_mutex); |
446 | |
447 | switch (atomic_load_explicit(&p->state, memory_order_acquire)) { |
448 | case STATE_GET_BUFFER: |
449 | p->result = ff_get_buffer(p->avctx, p->requested_frame, p->requested_flags); |
450 | break; |
451 | case STATE_GET_FORMAT: |
452 | p->result_format = ff_get_format(p->avctx, p->available_formats); |
453 | break; |
454 | default: |
455 | call_done = 0; |
456 | break; |
457 | } |
458 | if (call_done) { |
459 | atomic_store(&p->state, STATE_SETTING_UP); |
460 | pthread_cond_signal(&p->progress_cond); |
461 | } |
462 | pthread_mutex_unlock(&p->progress_mutex); |
463 | } |
464 | } |
465 | |
466 | fctx->prev_thread = p; |
467 | fctx->next_decoding++; |
468 | |
469 | return 0; |
470 | } |
471 | |
472 | int ff_thread_decode_frame(AVCodecContext *avctx, |
473 | AVFrame *picture, int *got_picture_ptr, |
474 | AVPacket *avpkt) |
475 | { |
476 | FrameThreadContext *fctx = avctx->internal->thread_ctx; |
477 | int finished = fctx->next_finished; |
478 | PerThreadContext *p; |
479 | int err; |
480 | |
481 | /* release the async lock, permitting blocked hwaccel threads to |
482 | * go forward while we are in this function */ |
483 | async_unlock(fctx); |
484 | |
485 | /* |
486 | * Submit a packet to the next decoding thread. |
487 | */ |
488 | |
489 | p = &fctx->threads[fctx->next_decoding]; |
490 | err = submit_packet(p, avctx, avpkt); |
491 | if (err) |
492 | goto finish; |
493 | |
494 | /* |
495 | * If we're still receiving the initial packets, don't return a frame. |
496 | */ |
497 | |
498 | if (fctx->next_decoding > (avctx->thread_count-1-(avctx->codec_id == AV_CODEC_ID_FFV1))) |
499 | fctx->delaying = 0; |
500 | |
501 | if (fctx->delaying) { |
502 | *got_picture_ptr=0; |
503 | if (avpkt->size) { |
504 | err = avpkt->size; |
505 | goto finish; |
506 | } |
507 | } |
508 | |
509 | /* |
510 | * Return the next available frame from the oldest thread. |
511 | * If we're at the end of the stream, then we have to skip threads that |
512 | * didn't output a frame, because we don't want to accidentally signal |
513 | * EOF (avpkt->size == 0 && *got_picture_ptr == 0). |
514 | */ |
515 | |
516 | do { |
517 | p = &fctx->threads[finished++]; |
518 | |
519 | if (atomic_load(&p->state) != STATE_INPUT_READY) { |
520 | pthread_mutex_lock(&p->progress_mutex); |
521 | while (atomic_load_explicit(&p->state, memory_order_relaxed) != STATE_INPUT_READY) |
522 | pthread_cond_wait(&p->output_cond, &p->progress_mutex); |
523 | pthread_mutex_unlock(&p->progress_mutex); |
524 | } |
525 | |
526 | av_frame_move_ref(picture, p->frame); |
527 | *got_picture_ptr = p->got_frame; |
528 | picture->pkt_dts = p->avpkt.dts; |
529 | |
530 | if (p->result < 0) |
531 | err = p->result; |
532 | |
533 | /* |
534 | * A later call with avkpt->size == 0 may loop over all threads, |
535 | * including this one, searching for a frame to return before being |
536 | * stopped by the "finished != fctx->next_finished" condition. |
537 | * Make sure we don't mistakenly return the same frame again. |
538 | */ |
539 | p->got_frame = 0; |
540 | |
541 | if (finished >= avctx->thread_count) finished = 0; |
542 | } while (!avpkt->size && !*got_picture_ptr && finished != fctx->next_finished); |
543 | |
544 | update_context_from_thread(avctx, p->avctx, 1); |
545 | |
546 | if (fctx->next_decoding >= avctx->thread_count) fctx->next_decoding = 0; |
547 | |
548 | fctx->next_finished = finished; |
549 | |
550 | /* return the size of the consumed packet if no error occurred */ |
551 | if (err >= 0) |
552 | err = avpkt->size; |
553 | finish: |
554 | async_lock(fctx); |
555 | return err; |
556 | } |
557 | |
558 | void ff_thread_report_progress(ThreadFrame *f, int n, int field) |
559 | { |
560 | PerThreadContext *p; |
561 | atomic_int *progress = f->progress ? (atomic_int*)f->progress->data : NULL; |
562 | |
563 | if (!progress || |
564 | atomic_load_explicit(&progress[field], memory_order_relaxed) >= n) |
565 | return; |
566 | |
567 | p = f->owner[field]->internal->thread_ctx; |
568 | |
569 | pthread_mutex_lock(&p->progress_mutex); |
570 | if (f->owner[field]->debug&FF_DEBUG_THREADS) |
571 | av_log(f->owner[field], AV_LOG_DEBUG, |
572 | "%p finished %d field %d\n", progress, n, field); |
573 | |
574 | atomic_store_explicit(&progress[field], n, memory_order_release); |
575 | |
576 | pthread_cond_broadcast(&p->progress_cond); |
577 | pthread_mutex_unlock(&p->progress_mutex); |
578 | } |
579 | |
580 | void ff_thread_await_progress(ThreadFrame *f, int n, int field) |
581 | { |
582 | PerThreadContext *p; |
583 | atomic_int *progress = f->progress ? (atomic_int*)f->progress->data : NULL; |
584 | |
585 | if (!progress || |
586 | atomic_load_explicit(&progress[field], memory_order_acquire) >= n) |
587 | return; |
588 | |
589 | p = f->owner[field]->internal->thread_ctx; |
590 | |
591 | pthread_mutex_lock(&p->progress_mutex); |
592 | if (f->owner[field]->debug&FF_DEBUG_THREADS) |
593 | av_log(f->owner[field], AV_LOG_DEBUG, |
594 | "thread awaiting %d field %d from %p\n", n, field, progress); |
595 | while (atomic_load_explicit(&progress[field], memory_order_relaxed) < n) |
596 | pthread_cond_wait(&p->progress_cond, &p->progress_mutex); |
597 | pthread_mutex_unlock(&p->progress_mutex); |
598 | } |
599 | |
600 | void ff_thread_finish_setup(AVCodecContext *avctx) { |
601 | PerThreadContext *p = avctx->internal->thread_ctx; |
602 | |
603 | if (!(avctx->active_thread_type&FF_THREAD_FRAME)) return; |
604 | |
605 | if (avctx->hwaccel && !p->hwaccel_serializing) { |
606 | pthread_mutex_lock(&p->parent->hwaccel_mutex); |
607 | p->hwaccel_serializing = 1; |
608 | } |
609 | |
610 | /* this assumes that no hwaccel calls happen before ff_thread_finish_setup() */ |
611 | if (avctx->hwaccel && |
612 | !(avctx->hwaccel->caps_internal & HWACCEL_CAP_ASYNC_SAFE)) { |
613 | p->async_serializing = 1; |
614 | |
615 | async_lock(p->parent); |
616 | } |
617 | |
618 | pthread_mutex_lock(&p->progress_mutex); |
619 | if(atomic_load(&p->state) == STATE_SETUP_FINISHED){ |
620 | av_log(avctx, AV_LOG_WARNING, "Multiple ff_thread_finish_setup() calls\n"); |
621 | } |
622 | |
623 | atomic_store(&p->state, STATE_SETUP_FINISHED); |
624 | |
625 | pthread_cond_broadcast(&p->progress_cond); |
626 | pthread_mutex_unlock(&p->progress_mutex); |
627 | } |
628 | |
629 | /// Waits for all threads to finish. |
630 | static void park_frame_worker_threads(FrameThreadContext *fctx, int thread_count) |
631 | { |
632 | int i; |
633 | |
634 | async_unlock(fctx); |
635 | |
636 | for (i = 0; i < thread_count; i++) { |
637 | PerThreadContext *p = &fctx->threads[i]; |
638 | |
639 | if (atomic_load(&p->state) != STATE_INPUT_READY) { |
640 | pthread_mutex_lock(&p->progress_mutex); |
641 | while (atomic_load(&p->state) != STATE_INPUT_READY) |
642 | pthread_cond_wait(&p->output_cond, &p->progress_mutex); |
643 | pthread_mutex_unlock(&p->progress_mutex); |
644 | } |
645 | p->got_frame = 0; |
646 | } |
647 | |
648 | async_lock(fctx); |
649 | } |
650 | |
651 | void ff_frame_thread_free(AVCodecContext *avctx, int thread_count) |
652 | { |
653 | FrameThreadContext *fctx = avctx->internal->thread_ctx; |
654 | const AVCodec *codec = avctx->codec; |
655 | int i; |
656 | |
657 | park_frame_worker_threads(fctx, thread_count); |
658 | |
659 | if (fctx->prev_thread && fctx->prev_thread != fctx->threads) |
660 | if (update_context_from_thread(fctx->threads->avctx, fctx->prev_thread->avctx, 0) < 0) { |
661 | av_log(avctx, AV_LOG_ERROR, "Final thread update failed\n"); |
662 | fctx->prev_thread->avctx->internal->is_copy = fctx->threads->avctx->internal->is_copy; |
663 | fctx->threads->avctx->internal->is_copy = 1; |
664 | } |
665 | |
666 | for (i = 0; i < thread_count; i++) { |
667 | PerThreadContext *p = &fctx->threads[i]; |
668 | |
669 | pthread_mutex_lock(&p->mutex); |
670 | p->die = 1; |
671 | pthread_cond_signal(&p->input_cond); |
672 | pthread_mutex_unlock(&p->mutex); |
673 | |
674 | if (p->thread_init) |
675 | pthread_join(p->thread, NULL); |
676 | p->thread_init=0; |
677 | |
678 | if (codec->close && p->avctx) |
679 | codec->close(p->avctx); |
680 | |
681 | release_delayed_buffers(p); |
682 | av_frame_free(&p->frame); |
683 | } |
684 | |
685 | for (i = 0; i < thread_count; i++) { |
686 | PerThreadContext *p = &fctx->threads[i]; |
687 | |
688 | pthread_mutex_destroy(&p->mutex); |
689 | pthread_mutex_destroy(&p->progress_mutex); |
690 | pthread_cond_destroy(&p->input_cond); |
691 | pthread_cond_destroy(&p->progress_cond); |
692 | pthread_cond_destroy(&p->output_cond); |
693 | av_packet_unref(&p->avpkt); |
694 | av_freep(&p->released_buffers); |
695 | |
696 | if (i && p->avctx) { |
697 | av_freep(&p->avctx->priv_data); |
698 | av_freep(&p->avctx->slice_offset); |
699 | } |
700 | |
701 | if (p->avctx) { |
702 | av_freep(&p->avctx->internal); |
703 | av_buffer_unref(&p->avctx->hw_frames_ctx); |
704 | } |
705 | |
706 | av_freep(&p->avctx); |
707 | } |
708 | |
709 | av_freep(&fctx->threads); |
710 | pthread_mutex_destroy(&fctx->buffer_mutex); |
711 | pthread_mutex_destroy(&fctx->hwaccel_mutex); |
712 | pthread_mutex_destroy(&fctx->async_mutex); |
713 | pthread_cond_destroy(&fctx->async_cond); |
714 | |
715 | av_freep(&avctx->internal->thread_ctx); |
716 | |
717 | if (avctx->priv_data && avctx->codec && avctx->codec->priv_class) |
718 | av_opt_free(avctx->priv_data); |
719 | avctx->codec = NULL; |
720 | } |
721 | |
722 | int ff_frame_thread_init(AVCodecContext *avctx) |
723 | { |
724 | int thread_count = avctx->thread_count; |
725 | const AVCodec *codec = avctx->codec; |
726 | AVCodecContext *src = avctx; |
727 | FrameThreadContext *fctx; |
728 | int i, err = 0; |
729 | |
730 | #if HAVE_W32THREADS |
731 | w32thread_init(); |
732 | #endif |
733 | |
734 | if (!thread_count) { |
735 | int nb_cpus = av_cpu_count(); |
736 | if ((avctx->debug & (FF_DEBUG_VIS_QP | FF_DEBUG_VIS_MB_TYPE)) || avctx->debug_mv) |
737 | nb_cpus = 1; |
738 | // use number of cores + 1 as thread count if there is more than one |
739 | if (nb_cpus > 1) |
740 | thread_count = avctx->thread_count = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS); |
741 | else |
742 | thread_count = avctx->thread_count = 1; |
743 | } |
744 | |
745 | if (thread_count <= 1) { |
746 | avctx->active_thread_type = 0; |
747 | return 0; |
748 | } |
749 | |
750 | avctx->internal->thread_ctx = fctx = av_mallocz(sizeof(FrameThreadContext)); |
751 | if (!fctx) |
752 | return AVERROR(ENOMEM); |
753 | |
754 | fctx->threads = av_mallocz_array(thread_count, sizeof(PerThreadContext)); |
755 | if (!fctx->threads) { |
756 | av_freep(&avctx->internal->thread_ctx); |
757 | return AVERROR(ENOMEM); |
758 | } |
759 | |
760 | pthread_mutex_init(&fctx->buffer_mutex, NULL); |
761 | pthread_mutex_init(&fctx->hwaccel_mutex, NULL); |
762 | pthread_mutex_init(&fctx->async_mutex, NULL); |
763 | pthread_cond_init(&fctx->async_cond, NULL); |
764 | |
765 | fctx->async_lock = 1; |
766 | fctx->delaying = 1; |
767 | |
768 | for (i = 0; i < thread_count; i++) { |
769 | AVCodecContext *copy = av_malloc(sizeof(AVCodecContext)); |
770 | PerThreadContext *p = &fctx->threads[i]; |
771 | |
772 | pthread_mutex_init(&p->mutex, NULL); |
773 | pthread_mutex_init(&p->progress_mutex, NULL); |
774 | pthread_cond_init(&p->input_cond, NULL); |
775 | pthread_cond_init(&p->progress_cond, NULL); |
776 | pthread_cond_init(&p->output_cond, NULL); |
777 | |
778 | p->frame = av_frame_alloc(); |
779 | if (!p->frame) { |
780 | av_freep(©); |
781 | err = AVERROR(ENOMEM); |
782 | goto error; |
783 | } |
784 | |
785 | p->parent = fctx; |
786 | p->avctx = copy; |
787 | |
788 | if (!copy) { |
789 | err = AVERROR(ENOMEM); |
790 | goto error; |
791 | } |
792 | |
793 | *copy = *src; |
794 | |
795 | copy->internal = av_malloc(sizeof(AVCodecInternal)); |
796 | if (!copy->internal) { |
797 | copy->priv_data = NULL; |
798 | err = AVERROR(ENOMEM); |
799 | goto error; |
800 | } |
801 | *copy->internal = *src->internal; |
802 | copy->internal->thread_ctx = p; |
803 | copy->internal->pkt = &p->avpkt; |
804 | |
805 | if (!i) { |
806 | src = copy; |
807 | |
808 | if (codec->init) |
809 | err = codec->init(copy); |
810 | |
811 | update_context_from_thread(avctx, copy, 1); |
812 | } else { |
813 | copy->priv_data = av_malloc(codec->priv_data_size); |
814 | if (!copy->priv_data) { |
815 | err = AVERROR(ENOMEM); |
816 | goto error; |
817 | } |
818 | memcpy(copy->priv_data, src->priv_data, codec->priv_data_size); |
819 | copy->internal->is_copy = 1; |
820 | |
821 | if (codec->init_thread_copy) |
822 | err = codec->init_thread_copy(copy); |
823 | } |
824 | |
825 | if (err) goto error; |
826 | |
827 | err = AVERROR(pthread_create(&p->thread, NULL, frame_worker_thread, p)); |
828 | p->thread_init= !err; |
829 | if(!p->thread_init) |
830 | goto error; |
831 | } |
832 | |
833 | return 0; |
834 | |
835 | error: |
836 | ff_frame_thread_free(avctx, i+1); |
837 | |
838 | return err; |
839 | } |
840 | |
841 | void ff_thread_flush(AVCodecContext *avctx) |
842 | { |
843 | int i; |
844 | FrameThreadContext *fctx = avctx->internal->thread_ctx; |
845 | |
846 | if (!fctx) return; |
847 | |
848 | park_frame_worker_threads(fctx, avctx->thread_count); |
849 | if (fctx->prev_thread) { |
850 | if (fctx->prev_thread != &fctx->threads[0]) |
851 | update_context_from_thread(fctx->threads[0].avctx, fctx->prev_thread->avctx, 0); |
852 | } |
853 | |
854 | fctx->next_decoding = fctx->next_finished = 0; |
855 | fctx->delaying = 1; |
856 | fctx->prev_thread = NULL; |
857 | for (i = 0; i < avctx->thread_count; i++) { |
858 | PerThreadContext *p = &fctx->threads[i]; |
859 | // Make sure decode flush calls with size=0 won't return old frames |
860 | p->got_frame = 0; |
861 | av_frame_unref(p->frame); |
862 | |
863 | release_delayed_buffers(p); |
864 | |
865 | if (avctx->codec->flush) |
866 | avctx->codec->flush(p->avctx); |
867 | } |
868 | } |
869 | |
870 | int ff_thread_can_start_frame(AVCodecContext *avctx) |
871 | { |
872 | PerThreadContext *p = avctx->internal->thread_ctx; |
873 | if ((avctx->active_thread_type&FF_THREAD_FRAME) && atomic_load(&p->state) != STATE_SETTING_UP && |
874 | (avctx->codec->update_thread_context || !THREAD_SAFE_CALLBACKS(avctx))) { |
875 | return 0; |
876 | } |
877 | return 1; |
878 | } |
879 | |
880 | static int thread_get_buffer_internal(AVCodecContext *avctx, ThreadFrame *f, int flags) |
881 | { |
882 | PerThreadContext *p = avctx->internal->thread_ctx; |
883 | int err; |
884 | |
885 | f->owner[0] = f->owner[1] = avctx; |
886 | |
887 | ff_init_buffer_info(avctx, f->f); |
888 | |
889 | if (!(avctx->active_thread_type & FF_THREAD_FRAME)) |
890 | return ff_get_buffer(avctx, f->f, flags); |
891 | |
892 | if (atomic_load(&p->state) != STATE_SETTING_UP && |
893 | (avctx->codec->update_thread_context || !THREAD_SAFE_CALLBACKS(avctx))) { |
894 | av_log(avctx, AV_LOG_ERROR, "get_buffer() cannot be called after ff_thread_finish_setup()\n"); |
895 | return -1; |
896 | } |
897 | |
898 | if (avctx->internal->allocate_progress) { |
899 | atomic_int *progress; |
900 | f->progress = av_buffer_alloc(2 * sizeof(*progress)); |
901 | if (!f->progress) { |
902 | return AVERROR(ENOMEM); |
903 | } |
904 | progress = (atomic_int*)f->progress->data; |
905 | |
906 | atomic_init(&progress[0], -1); |
907 | atomic_init(&progress[1], -1); |
908 | } |
909 | |
910 | pthread_mutex_lock(&p->parent->buffer_mutex); |
911 | if (avctx->thread_safe_callbacks || |
912 | avctx->get_buffer2 == avcodec_default_get_buffer2) { |
913 | err = ff_get_buffer(avctx, f->f, flags); |
914 | } else { |
915 | pthread_mutex_lock(&p->progress_mutex); |
916 | p->requested_frame = f->f; |
917 | p->requested_flags = flags; |
918 | atomic_store_explicit(&p->state, STATE_GET_BUFFER, memory_order_release); |
919 | pthread_cond_broadcast(&p->progress_cond); |
920 | |
921 | while (atomic_load(&p->state) != STATE_SETTING_UP) |
922 | pthread_cond_wait(&p->progress_cond, &p->progress_mutex); |
923 | |
924 | err = p->result; |
925 | |
926 | pthread_mutex_unlock(&p->progress_mutex); |
927 | |
928 | } |
929 | if (!THREAD_SAFE_CALLBACKS(avctx) && !avctx->codec->update_thread_context) |
930 | ff_thread_finish_setup(avctx); |
931 | if (err) |
932 | av_buffer_unref(&f->progress); |
933 | |
934 | pthread_mutex_unlock(&p->parent->buffer_mutex); |
935 | |
936 | return err; |
937 | } |
938 | |
939 | enum AVPixelFormat ff_thread_get_format(AVCodecContext *avctx, const enum AVPixelFormat *fmt) |
940 | { |
941 | enum AVPixelFormat res; |
942 | PerThreadContext *p = avctx->internal->thread_ctx; |
943 | if (!(avctx->active_thread_type & FF_THREAD_FRAME) || avctx->thread_safe_callbacks || |
944 | avctx->get_format == avcodec_default_get_format) |
945 | return ff_get_format(avctx, fmt); |
946 | if (atomic_load(&p->state) != STATE_SETTING_UP) { |
947 | av_log(avctx, AV_LOG_ERROR, "get_format() cannot be called after ff_thread_finish_setup()\n"); |
948 | return -1; |
949 | } |
950 | pthread_mutex_lock(&p->progress_mutex); |
951 | p->available_formats = fmt; |
952 | atomic_store(&p->state, STATE_GET_FORMAT); |
953 | pthread_cond_broadcast(&p->progress_cond); |
954 | |
955 | while (atomic_load(&p->state) != STATE_SETTING_UP) |
956 | pthread_cond_wait(&p->progress_cond, &p->progress_mutex); |
957 | |
958 | res = p->result_format; |
959 | |
960 | pthread_mutex_unlock(&p->progress_mutex); |
961 | |
962 | return res; |
963 | } |
964 | |
965 | int ff_thread_get_buffer(AVCodecContext *avctx, ThreadFrame *f, int flags) |
966 | { |
967 | int ret = thread_get_buffer_internal(avctx, f, flags); |
968 | if (ret < 0) |
969 | av_log(avctx, AV_LOG_ERROR, "thread_get_buffer() failed\n"); |
970 | return ret; |
971 | } |
972 | |
973 | void ff_thread_release_buffer(AVCodecContext *avctx, ThreadFrame *f) |
974 | { |
975 | PerThreadContext *p = avctx->internal->thread_ctx; |
976 | FrameThreadContext *fctx; |
977 | AVFrame *dst, *tmp; |
978 | int can_direct_free = !(avctx->active_thread_type & FF_THREAD_FRAME) || |
979 | avctx->thread_safe_callbacks || |
980 | avctx->get_buffer2 == avcodec_default_get_buffer2; |
981 | |
982 | if (!f->f || !f->f->buf[0]) |
983 | return; |
984 | |
985 | if (avctx->debug & FF_DEBUG_BUFFERS) |
986 | av_log(avctx, AV_LOG_DEBUG, "thread_release_buffer called on pic %p\n", f); |
987 | |
988 | av_buffer_unref(&f->progress); |
989 | f->owner[0] = f->owner[1] = NULL; |
990 | |
991 | if (can_direct_free) { |
992 | av_frame_unref(f->f); |
993 | return; |
994 | } |
995 | |
996 | fctx = p->parent; |
997 | pthread_mutex_lock(&fctx->buffer_mutex); |
998 | |
999 | if (p->num_released_buffers + 1 >= INT_MAX / sizeof(*p->released_buffers)) |
1000 | goto fail; |
1001 | tmp = av_fast_realloc(p->released_buffers, &p->released_buffers_allocated, |
1002 | (p->num_released_buffers + 1) * |
1003 | sizeof(*p->released_buffers)); |
1004 | if (!tmp) |
1005 | goto fail; |
1006 | p->released_buffers = tmp; |
1007 | |
1008 | dst = &p->released_buffers[p->num_released_buffers]; |
1009 | av_frame_move_ref(dst, f->f); |
1010 | |
1011 | p->num_released_buffers++; |
1012 | |
1013 | fail: |
1014 | pthread_mutex_unlock(&fctx->buffer_mutex); |
1015 | } |
1016 |