3 * This file is part of FFmpeg.
5 * FFmpeg is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
10 * FFmpeg is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with FFmpeg; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 * Libavfilter multithreading support
27 #include "libavutil/common.h"
28 #include "libavutil/cpu.h"
29 #include "libavutil/mem.h"
38 #include "compat/os2threads.h"
40 #include "compat/w32pthreads.h"
43 typedef struct ThreadContext
{
48 avfilter_action_func
*func
;
50 /* per-execute perameters */
57 pthread_cond_t last_job_cond
;
58 pthread_cond_t current_job_cond
;
59 pthread_mutex_t current_job_lock
;
61 unsigned int current_execute
;
65 static void* attribute_align_arg
worker(void *v
)
68 int our_job
= c
->nb_jobs
;
69 int nb_threads
= c
->nb_threads
;
70 unsigned int last_execute
= 0;
73 pthread_mutex_lock(&c
->current_job_lock
);
74 self_id
= c
->current_job
++;
76 while (our_job
>= c
->nb_jobs
) {
77 if (c
->current_job
== nb_threads
+ c
->nb_jobs
)
78 pthread_cond_signal(&c
->last_job_cond
);
80 while (last_execute
== c
->current_execute
&& !c
->done
)
81 pthread_cond_wait(&c
->current_job_cond
, &c
->current_job_lock
);
82 last_execute
= c
->current_execute
;
86 pthread_mutex_unlock(&c
->current_job_lock
);
90 pthread_mutex_unlock(&c
->current_job_lock
);
92 c
->rets
[our_job
% c
->nb_rets
] = c
->func(c
->ctx
, c
->arg
, our_job
, c
->nb_jobs
);
94 pthread_mutex_lock(&c
->current_job_lock
);
95 our_job
= c
->current_job
++;
99 static void slice_thread_uninit(ThreadContext
*c
)
103 pthread_mutex_lock(&c
->current_job_lock
);
105 pthread_cond_broadcast(&c
->current_job_cond
);
106 pthread_mutex_unlock(&c
->current_job_lock
);
108 for (i
= 0; i
< c
->nb_threads
; i
++)
109 pthread_join(c
->workers
[i
], NULL
);
111 pthread_mutex_destroy(&c
->current_job_lock
);
112 pthread_cond_destroy(&c
->current_job_cond
);
113 pthread_cond_destroy(&c
->last_job_cond
);
114 av_freep(&c
->workers
);
117 static void slice_thread_park_workers(ThreadContext
*c
)
119 while (c
->current_job
!= c
->nb_threads
+ c
->nb_jobs
)
120 pthread_cond_wait(&c
->last_job_cond
, &c
->current_job_lock
);
121 pthread_mutex_unlock(&c
->current_job_lock
);
124 static int thread_execute(AVFilterContext
*ctx
, avfilter_action_func
*func
,
125 void *arg
, int *ret
, int nb_jobs
)
127 ThreadContext
*c
= ctx
->graph
->internal
->thread
;
133 pthread_mutex_lock(&c
->current_job_lock
);
135 c
->current_job
= c
->nb_threads
;
136 c
->nb_jobs
= nb_jobs
;
142 c
->nb_rets
= nb_jobs
;
144 c
->rets
= &dummy_ret
;
147 c
->current_execute
++;
149 pthread_cond_broadcast(&c
->current_job_cond
);
151 slice_thread_park_workers(c
);
156 static int thread_init_internal(ThreadContext
*c
, int nb_threads
)
161 int nb_cpus
= av_cpu_count();
162 // use number of cores + 1 as thread count if there is more than one
164 nb_threads
= nb_cpus
+ 1;
172 c
->nb_threads
= nb_threads
;
173 c
->workers
= av_mallocz_array(sizeof(*c
->workers
), nb_threads
);
175 return AVERROR(ENOMEM
);
181 pthread_cond_init(&c
->current_job_cond
, NULL
);
182 pthread_cond_init(&c
->last_job_cond
, NULL
);
184 pthread_mutex_init(&c
->current_job_lock
, NULL
);
185 pthread_mutex_lock(&c
->current_job_lock
);
186 for (i
= 0; i
< nb_threads
; i
++) {
187 ret
= pthread_create(&c
->workers
[i
], NULL
, worker
, c
);
189 pthread_mutex_unlock(&c
->current_job_lock
);
191 slice_thread_uninit(c
);
196 slice_thread_park_workers(c
);
198 return c
->nb_threads
;
201 int ff_graph_thread_init(AVFilterGraph
*graph
)
209 if (graph
->nb_threads
== 1) {
210 graph
->thread_type
= 0;
214 graph
->internal
->thread
= av_mallocz(sizeof(ThreadContext
));
215 if (!graph
->internal
->thread
)
216 return AVERROR(ENOMEM
);
218 ret
= thread_init_internal(graph
->internal
->thread
, graph
->nb_threads
);
220 av_freep(&graph
->internal
->thread
);
221 graph
->thread_type
= 0;
222 graph
->nb_threads
= 1;
223 return (ret
< 0) ? ret
: 0;
225 graph
->nb_threads
= ret
;
227 graph
->internal
->thread_execute
= thread_execute
;
232 void ff_graph_thread_free(AVFilterGraph
*graph
)
234 if (graph
->internal
->thread
)
235 slice_thread_uninit(graph
->internal
->thread
);
236 av_freep(&graph
->internal
->thread
);