Imported Upstream version 1.4+222+hg5f9f7194267b
[deb_x265.git] / source / common / threadpool.cpp
CommitLineData
72b9787e 1/*****************************************************************************
72b9787e
JB
2 * Copyright (C) 2013 x265 project
3 *
4 * Authors: Steve Borho <steve@borho.org>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111, USA.
19 *
20 * This program is also available under a commercial proprietary license.
21 * For more information, contact us at license @ x265.com
22 *****************************************************************************/
23
24#include "common.h"
25#include "threadpool.h"
26#include "threading.h"
27
28#include <new>
29
30#if MACOS
31#include <sys/param.h>
32#include <sys/sysctl.h>
33#endif
34
35namespace x265 {
36// x265 private namespace
37
38class ThreadPoolImpl;
39
40class PoolThread : public Thread
41{
42private:
43
44 ThreadPoolImpl &m_pool;
45
46 PoolThread& operator =(const PoolThread&);
47
48 int m_id;
49
50 bool m_dirty;
51
52 bool m_exited;
53
54 Event m_wakeEvent;
55
56public:
57
58 PoolThread(ThreadPoolImpl& pool, int id)
59 : m_pool(pool)
60 , m_id(id)
61 , m_dirty(false)
62 , m_exited(false)
63 {
64 }
65
66 bool isDirty() const { return m_dirty; }
67
68 void markDirty() { m_dirty = true; }
69
70 bool isExited() const { return m_exited; }
71
72 void poke() { m_wakeEvent.trigger(); }
73
74 virtual ~PoolThread() {}
75
76 void threadMain();
77};
78
79class ThreadPoolImpl : public ThreadPool
80{
81private:
82
83 bool m_ok;
84 int m_referenceCount;
85 int m_numThreads;
86 int m_numSleepMapWords;
87 PoolThread *m_threads;
b53f7c52 88 volatile uint32_t *m_sleepMap;
72b9787e
JB
89
90 /* Lock for write access to the provider lists. Threads are
91 * always allowed to read m_firstProvider and follow the
92 * linked list. Providers must zero their m_nextProvider
93 * pointers before removing themselves from this list */
94 Lock m_writeLock;
95
96public:
97
98 static ThreadPoolImpl *s_instance;
99 static Lock s_createLock;
100
101 JobProvider *m_firstProvider;
102 JobProvider *m_lastProvider;
103
104public:
105
106 ThreadPoolImpl(int numthreads);
107
108 virtual ~ThreadPoolImpl();
109
110 ThreadPoolImpl *AddReference()
111 {
112 m_referenceCount++;
113
114 return this;
115 }
116
117 void markThreadAsleep(int id);
118
119 void waitForAllIdle();
120
121 int getThreadCount() const { return m_numThreads; }
122
123 bool IsValid() const { return m_ok; }
124
125 void release();
126
127 void Stop();
128
129 void enqueueJobProvider(JobProvider &);
130
131 void dequeueJobProvider(JobProvider &);
132
133 void FlushProviderList();
134
135 void pokeIdleThread();
136};
137
138void PoolThread::threadMain()
139{
140#if _WIN32
141 SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL);
142#else
143 __attribute__((unused)) int val = nice(10);
144#endif
145
146 while (m_pool.IsValid())
147 {
148 /* Walk list of job providers, looking for work */
149 JobProvider *cur = m_pool.m_firstProvider;
150 while (cur)
151 {
152 // FindJob() may perform actual work and return true. If
153 // it does we restart the job search
154 if (cur->findJob(m_id) == true)
155 break;
156
157 cur = cur->m_nextProvider;
158 }
159
160 // this thread has reached the end of the provider list
161 m_dirty = false;
162
163 if (cur == NULL)
164 {
165 m_pool.markThreadAsleep(m_id);
166 m_wakeEvent.wait();
167 }
168 }
169
170 m_exited = true;
171}
172
173void ThreadPoolImpl::markThreadAsleep(int id)
174{
b53f7c52
JB
175 int word = id >> 5;
176 uint32_t bit = 1 << (id & 31);
72b9787e
JB
177
178 ATOMIC_OR(&m_sleepMap[word], bit);
179}
180
181void ThreadPoolImpl::pokeIdleThread()
182{
183 /* Find a bit in the sleeping thread bitmap and poke it awake, do
184 * not give up until a thread is awakened or all of them are awake */
185 for (int i = 0; i < m_numSleepMapWords; i++)
186 {
b53f7c52 187 uint32_t oldval = m_sleepMap[i];
72b9787e
JB
188 while (oldval)
189 {
190 unsigned long id;
b53f7c52 191 CTZ(id, oldval);
72b9787e 192
b53f7c52
JB
193 uint32_t bit = 1 << id;
194 if (ATOMIC_AND(&m_sleepMap[i], ~bit) & bit)
72b9787e 195 {
b53f7c52 196 m_threads[i * 32 + id].poke();
72b9787e
JB
197 return;
198 }
199
200 oldval = m_sleepMap[i];
201 }
202 }
203}
204
205ThreadPoolImpl *ThreadPoolImpl::s_instance;
206Lock ThreadPoolImpl::s_createLock;
207
208/* static */
209ThreadPool *ThreadPool::allocThreadPool(int numthreads)
210{
211 if (ThreadPoolImpl::s_instance)
212 return ThreadPoolImpl::s_instance->AddReference();
213
214 /* acquire the lock to create the instance */
215 ThreadPoolImpl::s_createLock.acquire();
216
217 if (ThreadPoolImpl::s_instance)
218 /* pool was allocated while we waited for the lock */
219 ThreadPoolImpl::s_instance->AddReference();
220 else
221 ThreadPoolImpl::s_instance = new ThreadPoolImpl(numthreads);
222 ThreadPoolImpl::s_createLock.release();
223
224 return ThreadPoolImpl::s_instance;
225}
226
227ThreadPool *ThreadPool::getThreadPool()
228{
229 X265_CHECK(ThreadPoolImpl::s_instance, "getThreadPool() called prior to allocThreadPool()\n");
230 return ThreadPoolImpl::s_instance;
231}
232
233void ThreadPoolImpl::release()
234{
235 if (--m_referenceCount == 0)
236 {
237 X265_CHECK(this == ThreadPoolImpl::s_instance, "multiple thread pool instances detected\n");
238 ThreadPoolImpl::s_instance = NULL;
239 this->Stop();
240 delete this;
241 }
242}
243
244ThreadPoolImpl::ThreadPoolImpl(int numThreads)
245 : m_ok(false)
246 , m_referenceCount(1)
247 , m_firstProvider(NULL)
248 , m_lastProvider(NULL)
249{
b53f7c52
JB
250 m_numSleepMapWords = (numThreads + 31) >> 5;
251 m_sleepMap = X265_MALLOC(uint32_t, m_numSleepMapWords);
72b9787e
JB
252
253 char *buffer = (char*)X265_MALLOC(PoolThread, numThreads);
254 m_threads = reinterpret_cast<PoolThread*>(buffer);
255 m_numThreads = numThreads;
256
257 if (m_threads && m_sleepMap)
258 {
259 for (int i = 0; i < m_numSleepMapWords; i++)
72b9787e 260 m_sleepMap[i] = 0;
72b9787e
JB
261
262 m_ok = true;
263 int i;
264 for (i = 0; i < numThreads; i++)
265 {
266 new (buffer)PoolThread(*this, i);
267 buffer += sizeof(PoolThread);
268 if (!m_threads[i].start())
269 {
270 m_ok = false;
271 break;
272 }
273 }
274
275 if (m_ok)
72b9787e 276 waitForAllIdle();
72b9787e
JB
277 else
278 {
279 // stop threads that did start up
280 for (int j = 0; j < i; j++)
281 {
282 m_threads[j].poke();
283 m_threads[j].stop();
284 }
285 }
286 }
287}
288
289void ThreadPoolImpl::waitForAllIdle()
290{
291 if (!m_ok)
292 return;
293
294 int id = 0;
295 do
296 {
b53f7c52
JB
297 int word = id >> 5;
298 uint32_t bit = 1 << (id & 31);
72b9787e 299 if (m_sleepMap[word] & bit)
72b9787e 300 id++;
72b9787e
JB
301 else
302 {
303 GIVE_UP_TIME();
304 }
305 }
306 while (id < m_numThreads);
307}
308
309void ThreadPoolImpl::Stop()
310{
311 if (m_ok)
312 {
313 waitForAllIdle();
314
315 // set invalid flag, then wake them up so they exit their main func
316 m_ok = false;
317 for (int i = 0; i < m_numThreads; i++)
318 {
319 m_threads[i].poke();
320 m_threads[i].stop();
321 }
322 }
323}
324
325ThreadPoolImpl::~ThreadPoolImpl()
326{
327 X265_FREE((void*)m_sleepMap);
328
329 if (m_threads)
330 {
331 // cleanup thread handles
332 for (int i = 0; i < m_numThreads; i++)
72b9787e 333 m_threads[i].~PoolThread();
72b9787e
JB
334
335 X265_FREE(reinterpret_cast<char*>(m_threads));
336 }
337}
338
339void ThreadPoolImpl::enqueueJobProvider(JobProvider &p)
340{
341 // only one list writer at a time
342 ScopedLock l(m_writeLock);
343
344 p.m_nextProvider = NULL;
345 p.m_prevProvider = m_lastProvider;
346 m_lastProvider = &p;
347
348 if (p.m_prevProvider)
349 p.m_prevProvider->m_nextProvider = &p;
350 else
351 m_firstProvider = &p;
352}
353
354void ThreadPoolImpl::dequeueJobProvider(JobProvider &p)
355{
356 // only one list writer at a time
357 ScopedLock l(m_writeLock);
358
359 // update pool entry pointers first
360 if (m_firstProvider == &p)
361 m_firstProvider = p.m_nextProvider;
362
363 if (m_lastProvider == &p)
364 m_lastProvider = p.m_prevProvider;
365
366 // extract self from doubly linked lists
367 if (p.m_nextProvider)
368 p.m_nextProvider->m_prevProvider = p.m_prevProvider;
369
370 if (p.m_prevProvider)
371 p.m_prevProvider->m_nextProvider = p.m_nextProvider;
372
373 p.m_nextProvider = NULL;
374 p.m_prevProvider = NULL;
375}
376
377/* Ensure all threads have made a full pass through the provider list, ensuring
378 * dequeued providers are safe for deletion. */
379void ThreadPoolImpl::FlushProviderList()
380{
381 for (int i = 0; i < m_numThreads; i++)
382 {
383 m_threads[i].markDirty();
384 m_threads[i].poke();
385 }
386
387 int i;
388 do
389 {
390 for (i = 0; i < m_numThreads; i++)
391 {
392 if (m_threads[i].isDirty())
393 {
394 GIVE_UP_TIME();
395 break;
396 }
397 }
398 }
399 while (i < m_numThreads);
400}
401
402void JobProvider::flush()
403{
404 if (m_nextProvider || m_prevProvider)
405 dequeue();
406 dynamic_cast<ThreadPoolImpl*>(m_pool)->FlushProviderList();
407}
408
409void JobProvider::enqueue()
410{
411 // Add this provider to the end of the thread pool's job provider list
412 X265_CHECK(!m_nextProvider && !m_prevProvider && m_pool, "job provider was already queued\n");
413 m_pool->enqueueJobProvider(*this);
414 m_pool->pokeIdleThread();
415}
416
417void JobProvider::dequeue()
418{
419 // Remove this provider from the thread pool's job provider list
420 m_pool->dequeueJobProvider(*this);
421 // Ensure no jobs were missed while the provider was being removed
422 m_pool->pokeIdleThread();
423}
424
425int getCpuCount()
426{
427#if _WIN32
428 SYSTEM_INFO sysinfo;
429 GetSystemInfo(&sysinfo);
430 return sysinfo.dwNumberOfProcessors;
431#elif __unix__
432 return sysconf(_SC_NPROCESSORS_ONLN);
433#elif MACOS
434 int nm[2];
435 size_t len = 4;
436 uint32_t count;
437
438 nm[0] = CTL_HW;
439 nm[1] = HW_AVAILCPU;
440 sysctl(nm, 2, &count, &len, NULL, 0);
441
442 if (count < 1)
443 {
444 nm[1] = HW_NCPU;
445 sysctl(nm, 2, &count, &len, NULL, 0);
446 if (count < 1)
447 count = 1;
448 }
449
450 return count;
451#else // if _WIN32
452 return 2; // default to 2 threads, everywhere else
453#endif // if _WIN32
454}
455} // end namespace x265