2 * This file is part of FFmpeg.
4 * FFmpeg is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * FFmpeg is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with FFmpeg; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 * Slice multithreading support functions
22 * @see doc/multithreading.txt
30 #include "compat/w32pthreads.h"
32 #include "compat/os2threads.h"
37 #include "pthread_internal.h"
40 #include "libavutil/common.h"
41 #include "libavutil/cpu.h"
42 #include "libavutil/mem.h"
44 typedef int (action_func
)(AVCodecContext
*c
, void *arg
);
45 typedef int (action_func2
)(AVCodecContext
*c
, void *arg
, int jobnr
, int threadnr
);
47 typedef struct SliceThreadContext
{
57 pthread_cond_t last_job_cond
;
58 pthread_cond_t current_job_cond
;
59 pthread_mutex_t current_job_lock
;
60 unsigned current_execute
;
67 pthread_cond_t
*progress_cond
;
68 pthread_mutex_t
*progress_mutex
;
71 static void* attribute_align_arg
worker(void *v
)
73 AVCodecContext
*avctx
= v
;
74 SliceThreadContext
*c
= avctx
->internal
->thread_ctx
;
75 unsigned last_execute
= 0;
76 int our_job
= c
->job_count
;
77 int thread_count
= avctx
->thread_count
;
80 pthread_mutex_lock(&c
->current_job_lock
);
81 self_id
= c
->current_job
++;
83 while (our_job
>= c
->job_count
) {
84 if (c
->current_job
== thread_count
+ c
->job_count
)
85 pthread_cond_signal(&c
->last_job_cond
);
87 while (last_execute
== c
->current_execute
&& !c
->done
)
88 pthread_cond_wait(&c
->current_job_cond
, &c
->current_job_lock
);
89 last_execute
= c
->current_execute
;
93 pthread_mutex_unlock(&c
->current_job_lock
);
97 pthread_mutex_unlock(&c
->current_job_lock
);
99 c
->rets
[our_job
%c
->rets_count
] = c
->func
? c
->func(avctx
, (char*)c
->args
+ our_job
*c
->job_size
):
100 c
->func2(avctx
, c
->args
, our_job
, self_id
);
102 pthread_mutex_lock(&c
->current_job_lock
);
103 our_job
= c
->current_job
++;
107 void ff_slice_thread_free(AVCodecContext
*avctx
)
109 SliceThreadContext
*c
= avctx
->internal
->thread_ctx
;
112 pthread_mutex_lock(&c
->current_job_lock
);
114 pthread_cond_broadcast(&c
->current_job_cond
);
115 pthread_mutex_unlock(&c
->current_job_lock
);
117 for (i
=0; i
<avctx
->thread_count
; i
++)
118 pthread_join(c
->workers
[i
], NULL
);
120 pthread_mutex_destroy(&c
->current_job_lock
);
121 pthread_cond_destroy(&c
->current_job_cond
);
122 pthread_cond_destroy(&c
->last_job_cond
);
123 av_freep(&c
->workers
);
124 av_freep(&avctx
->internal
->thread_ctx
);
127 static av_always_inline
void thread_park_workers(SliceThreadContext
*c
, int thread_count
)
129 while (c
->current_job
!= thread_count
+ c
->job_count
)
130 pthread_cond_wait(&c
->last_job_cond
, &c
->current_job_lock
);
131 pthread_mutex_unlock(&c
->current_job_lock
);
134 static int thread_execute(AVCodecContext
*avctx
, action_func
* func
, void *arg
, int *ret
, int job_count
, int job_size
)
136 SliceThreadContext
*c
= avctx
->internal
->thread_ctx
;
139 if (!(avctx
->active_thread_type
&FF_THREAD_SLICE
) || avctx
->thread_count
<= 1)
140 return avcodec_default_execute(avctx
, func
, arg
, ret
, job_count
, job_size
);
145 pthread_mutex_lock(&c
->current_job_lock
);
147 c
->current_job
= avctx
->thread_count
;
148 c
->job_count
= job_count
;
149 c
->job_size
= job_size
;
154 c
->rets_count
= job_count
;
156 c
->rets
= &dummy_ret
;
159 c
->current_execute
++;
160 pthread_cond_broadcast(&c
->current_job_cond
);
162 thread_park_workers(c
, avctx
->thread_count
);
167 static int thread_execute2(AVCodecContext
*avctx
, action_func2
* func2
, void *arg
, int *ret
, int job_count
)
169 SliceThreadContext
*c
= avctx
->internal
->thread_ctx
;
171 return thread_execute(avctx
, NULL
, arg
, ret
, job_count
, 0);
174 int ff_slice_thread_init(AVCodecContext
*avctx
)
177 SliceThreadContext
*c
;
178 int thread_count
= avctx
->thread_count
;
185 int nb_cpus
= av_cpu_count();
187 nb_cpus
= FFMIN(nb_cpus
, (avctx
->height
+15)/16);
188 // use number of cores + 1 as thread count if there is more than one
190 thread_count
= avctx
->thread_count
= FFMIN(nb_cpus
+ 1, MAX_AUTO_THREADS
);
192 thread_count
= avctx
->thread_count
= 1;
195 if (thread_count
<= 1) {
196 avctx
->active_thread_type
= 0;
200 c
= av_mallocz(sizeof(SliceThreadContext
));
204 c
->workers
= av_mallocz_array(thread_count
, sizeof(pthread_t
));
210 avctx
->internal
->thread_ctx
= c
;
215 pthread_cond_init(&c
->current_job_cond
, NULL
);
216 pthread_cond_init(&c
->last_job_cond
, NULL
);
217 pthread_mutex_init(&c
->current_job_lock
, NULL
);
218 pthread_mutex_lock(&c
->current_job_lock
);
219 for (i
=0; i
<thread_count
; i
++) {
220 if(pthread_create(&c
->workers
[i
], NULL
, worker
, avctx
)) {
221 avctx
->thread_count
= i
;
222 pthread_mutex_unlock(&c
->current_job_lock
);
223 ff_thread_free(avctx
);
228 thread_park_workers(c
, thread_count
);
230 avctx
->execute
= thread_execute
;
231 avctx
->execute2
= thread_execute2
;
235 void ff_thread_report_progress2(AVCodecContext
*avctx
, int field
, int thread
, int n
)
237 SliceThreadContext
*p
= avctx
->internal
->thread_ctx
;
238 int *entries
= p
->entries
;
240 pthread_mutex_lock(&p
->progress_mutex
[thread
]);
242 pthread_cond_signal(&p
->progress_cond
[thread
]);
243 pthread_mutex_unlock(&p
->progress_mutex
[thread
]);
246 void ff_thread_await_progress2(AVCodecContext
*avctx
, int field
, int thread
, int shift
)
248 SliceThreadContext
*p
= avctx
->internal
->thread_ctx
;
249 int *entries
= p
->entries
;
251 if (!entries
|| !field
) return;
253 thread
= thread
? thread
- 1 : p
->thread_count
- 1;
255 pthread_mutex_lock(&p
->progress_mutex
[thread
]);
256 while ((entries
[field
- 1] - entries
[field
]) < shift
){
257 pthread_cond_wait(&p
->progress_cond
[thread
], &p
->progress_mutex
[thread
]);
259 pthread_mutex_unlock(&p
->progress_mutex
[thread
]);
262 int ff_alloc_entries(AVCodecContext
*avctx
, int count
)
266 if (avctx
->active_thread_type
& FF_THREAD_SLICE
) {
267 SliceThreadContext
*p
= avctx
->internal
->thread_ctx
;
268 p
->thread_count
= avctx
->thread_count
;
269 p
->entries
= av_mallocz_array(count
, sizeof(int));
271 p
->progress_mutex
= av_malloc_array(p
->thread_count
, sizeof(pthread_mutex_t
));
272 p
->progress_cond
= av_malloc_array(p
->thread_count
, sizeof(pthread_cond_t
));
274 if (!p
->entries
|| !p
->progress_mutex
|| !p
->progress_cond
) {
275 av_freep(&p
->entries
);
276 av_freep(&p
->progress_mutex
);
277 av_freep(&p
->progress_cond
);
278 return AVERROR(ENOMEM
);
280 p
->entries_count
= count
;
282 for (i
= 0; i
< p
->thread_count
; i
++) {
283 pthread_mutex_init(&p
->progress_mutex
[i
], NULL
);
284 pthread_cond_init(&p
->progress_cond
[i
], NULL
);
291 void ff_reset_entries(AVCodecContext
*avctx
)
293 SliceThreadContext
*p
= avctx
->internal
->thread_ctx
;
294 memset(p
->entries
, 0, p
->entries_count
* sizeof(int));