blob: 7c5cd2463cee7e75f3a1a36e8746b52760f0ef40
1 | /* |
2 | * Copyright (c) 2014 Nicolas George |
3 | * |
4 | * This file is part of FFmpeg. |
5 | * |
6 | * FFmpeg is free software; you can redistribute it and/or |
7 | * modify it under the terms of the GNU Lesser General Public License |
8 | * as published by the Free Software Foundation; either |
9 | * version 2.1 of the License, or (at your option) any later version. |
10 | * |
11 | * FFmpeg is distributed in the hope that it will be useful, |
12 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
13 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
14 | * GNU Lesser General Public License for more details. |
15 | * |
16 | * You should have received a copy of the GNU Lesser General Public License |
17 | * along with FFmpeg; if not, write to the Free Software Foundation, Inc., |
18 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
19 | */ |
20 | |
21 | #include "fifo.h" |
22 | #include "threadmessage.h" |
23 | #include "thread.h" |
24 | |
25 | struct AVThreadMessageQueue { |
26 | #if HAVE_THREADS |
27 | AVFifoBuffer *fifo; |
28 | pthread_mutex_t lock; |
29 | pthread_cond_t cond_recv; |
30 | pthread_cond_t cond_send; |
31 | int err_send; |
32 | int err_recv; |
33 | unsigned elsize; |
34 | void (*free_func)(void *msg); |
35 | #else |
36 | int dummy; |
37 | #endif |
38 | }; |
39 | |
40 | int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, |
41 | unsigned nelem, |
42 | unsigned elsize) |
43 | { |
44 | #if HAVE_THREADS |
45 | AVThreadMessageQueue *rmq; |
46 | int ret = 0; |
47 | |
48 | if (nelem > INT_MAX / elsize) |
49 | return AVERROR(EINVAL); |
50 | if (!(rmq = av_mallocz(sizeof(*rmq)))) |
51 | return AVERROR(ENOMEM); |
52 | if ((ret = pthread_mutex_init(&rmq->lock, NULL))) { |
53 | av_free(rmq); |
54 | return AVERROR(ret); |
55 | } |
56 | if ((ret = pthread_cond_init(&rmq->cond_recv, NULL))) { |
57 | pthread_mutex_destroy(&rmq->lock); |
58 | av_free(rmq); |
59 | return AVERROR(ret); |
60 | } |
61 | if ((ret = pthread_cond_init(&rmq->cond_send, NULL))) { |
62 | pthread_cond_destroy(&rmq->cond_recv); |
63 | pthread_mutex_destroy(&rmq->lock); |
64 | av_free(rmq); |
65 | return AVERROR(ret); |
66 | } |
67 | if (!(rmq->fifo = av_fifo_alloc(elsize * nelem))) { |
68 | pthread_cond_destroy(&rmq->cond_send); |
69 | pthread_cond_destroy(&rmq->cond_recv); |
70 | pthread_mutex_destroy(&rmq->lock); |
71 | av_free(rmq); |
72 | return AVERROR(ret); |
73 | } |
74 | rmq->elsize = elsize; |
75 | *mq = rmq; |
76 | return 0; |
77 | #else |
78 | *mq = NULL; |
79 | return AVERROR(ENOSYS); |
80 | #endif /* HAVE_THREADS */ |
81 | } |
82 | |
83 | void av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq, |
84 | void (*free_func)(void *msg)) |
85 | { |
86 | #if HAVE_THREADS |
87 | mq->free_func = free_func; |
88 | #endif |
89 | } |
90 | |
91 | void av_thread_message_queue_free(AVThreadMessageQueue **mq) |
92 | { |
93 | #if HAVE_THREADS |
94 | if (*mq) { |
95 | av_thread_message_flush(*mq); |
96 | av_fifo_freep(&(*mq)->fifo); |
97 | pthread_cond_destroy(&(*mq)->cond_send); |
98 | pthread_cond_destroy(&(*mq)->cond_recv); |
99 | pthread_mutex_destroy(&(*mq)->lock); |
100 | av_freep(mq); |
101 | } |
102 | #endif |
103 | } |
104 | |
105 | #if HAVE_THREADS |
106 | |
107 | static int av_thread_message_queue_send_locked(AVThreadMessageQueue *mq, |
108 | void *msg, |
109 | unsigned flags) |
110 | { |
111 | while (!mq->err_send && av_fifo_space(mq->fifo) < mq->elsize) { |
112 | if ((flags & AV_THREAD_MESSAGE_NONBLOCK)) |
113 | return AVERROR(EAGAIN); |
114 | pthread_cond_wait(&mq->cond_send, &mq->lock); |
115 | } |
116 | if (mq->err_send) |
117 | return mq->err_send; |
118 | av_fifo_generic_write(mq->fifo, msg, mq->elsize, NULL); |
119 | /* one message is sent, signal one receiver */ |
120 | pthread_cond_signal(&mq->cond_recv); |
121 | return 0; |
122 | } |
123 | |
124 | static int av_thread_message_queue_recv_locked(AVThreadMessageQueue *mq, |
125 | void *msg, |
126 | unsigned flags) |
127 | { |
128 | while (!mq->err_recv && av_fifo_size(mq->fifo) < mq->elsize) { |
129 | if ((flags & AV_THREAD_MESSAGE_NONBLOCK)) |
130 | return AVERROR(EAGAIN); |
131 | pthread_cond_wait(&mq->cond_recv, &mq->lock); |
132 | } |
133 | if (av_fifo_size(mq->fifo) < mq->elsize) |
134 | return mq->err_recv; |
135 | av_fifo_generic_read(mq->fifo, msg, mq->elsize, NULL); |
136 | /* one message space appeared, signal one sender */ |
137 | pthread_cond_signal(&mq->cond_send); |
138 | return 0; |
139 | } |
140 | |
141 | #endif /* HAVE_THREADS */ |
142 | |
143 | int av_thread_message_queue_send(AVThreadMessageQueue *mq, |
144 | void *msg, |
145 | unsigned flags) |
146 | { |
147 | #if HAVE_THREADS |
148 | int ret; |
149 | |
150 | pthread_mutex_lock(&mq->lock); |
151 | ret = av_thread_message_queue_send_locked(mq, msg, flags); |
152 | pthread_mutex_unlock(&mq->lock); |
153 | return ret; |
154 | #else |
155 | return AVERROR(ENOSYS); |
156 | #endif /* HAVE_THREADS */ |
157 | } |
158 | |
159 | int av_thread_message_queue_recv(AVThreadMessageQueue *mq, |
160 | void *msg, |
161 | unsigned flags) |
162 | { |
163 | #if HAVE_THREADS |
164 | int ret; |
165 | |
166 | pthread_mutex_lock(&mq->lock); |
167 | ret = av_thread_message_queue_recv_locked(mq, msg, flags); |
168 | pthread_mutex_unlock(&mq->lock); |
169 | return ret; |
170 | #else |
171 | return AVERROR(ENOSYS); |
172 | #endif /* HAVE_THREADS */ |
173 | } |
174 | |
175 | void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, |
176 | int err) |
177 | { |
178 | #if HAVE_THREADS |
179 | pthread_mutex_lock(&mq->lock); |
180 | mq->err_send = err; |
181 | pthread_cond_broadcast(&mq->cond_send); |
182 | pthread_mutex_unlock(&mq->lock); |
183 | #endif /* HAVE_THREADS */ |
184 | } |
185 | |
186 | void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, |
187 | int err) |
188 | { |
189 | #if HAVE_THREADS |
190 | pthread_mutex_lock(&mq->lock); |
191 | mq->err_recv = err; |
192 | pthread_cond_broadcast(&mq->cond_recv); |
193 | pthread_mutex_unlock(&mq->lock); |
194 | #endif /* HAVE_THREADS */ |
195 | } |
196 | |
197 | #if HAVE_THREADS |
198 | static void free_func_wrap(void *arg, void *msg, int size) |
199 | { |
200 | AVThreadMessageQueue *mq = arg; |
201 | mq->free_func(msg); |
202 | } |
203 | #endif |
204 | |
205 | void av_thread_message_flush(AVThreadMessageQueue *mq) |
206 | { |
207 | #if HAVE_THREADS |
208 | int used, off; |
209 | void *free_func = mq->free_func; |
210 | |
211 | pthread_mutex_lock(&mq->lock); |
212 | used = av_fifo_size(mq->fifo); |
213 | if (free_func) |
214 | for (off = 0; off < used; off += mq->elsize) |
215 | av_fifo_generic_peek_at(mq->fifo, mq, off, mq->elsize, free_func_wrap); |
216 | av_fifo_drain(mq->fifo, used); |
217 | /* only the senders need to be notified since the queue is empty and there |
218 | * is nothing to read */ |
219 | pthread_cond_broadcast(&mq->cond_send); |
220 | pthread_mutex_unlock(&mq->lock); |
221 | #endif /* HAVE_THREADS */ |
222 | } |
223 |