Imported Upstream version 1.4
[deb_x265.git] / source / common / threadpool.cpp
CommitLineData
72b9787e
JB
1/*****************************************************************************
2 * x265: singleton thread pool and interface classes
3 *****************************************************************************
4 * Copyright (C) 2013 x265 project
5 *
6 * Authors: Steve Borho <steve@borho.org>
7 *
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111, USA.
21 *
22 * This program is also available under a commercial proprietary license.
23 * For more information, contact us at license @ x265.com
24 *****************************************************************************/
25
26#include "common.h"
27#include "threadpool.h"
28#include "threading.h"
29
30#include <new>
31
32#if MACOS
33#include <sys/param.h>
34#include <sys/sysctl.h>
35#endif
36
37namespace x265 {
38// x265 private namespace
39
40class ThreadPoolImpl;
41
42class PoolThread : public Thread
43{
44private:
45
46 ThreadPoolImpl &m_pool;
47
48 PoolThread& operator =(const PoolThread&);
49
50 int m_id;
51
52 bool m_dirty;
53
54 bool m_exited;
55
56 Event m_wakeEvent;
57
58public:
59
60 PoolThread(ThreadPoolImpl& pool, int id)
61 : m_pool(pool)
62 , m_id(id)
63 , m_dirty(false)
64 , m_exited(false)
65 {
66 }
67
68 bool isDirty() const { return m_dirty; }
69
70 void markDirty() { m_dirty = true; }
71
72 bool isExited() const { return m_exited; }
73
74 void poke() { m_wakeEvent.trigger(); }
75
76 virtual ~PoolThread() {}
77
78 void threadMain();
79};
80
81class ThreadPoolImpl : public ThreadPool
82{
83private:
84
85 bool m_ok;
86 int m_referenceCount;
87 int m_numThreads;
88 int m_numSleepMapWords;
89 PoolThread *m_threads;
90 volatile uint64_t *m_sleepMap;
91
92 /* Lock for write access to the provider lists. Threads are
93 * always allowed to read m_firstProvider and follow the
94 * linked list. Providers must zero their m_nextProvider
95 * pointers before removing themselves from this list */
96 Lock m_writeLock;
97
98public:
99
100 static ThreadPoolImpl *s_instance;
101 static Lock s_createLock;
102
103 JobProvider *m_firstProvider;
104 JobProvider *m_lastProvider;
105
106public:
107
108 ThreadPoolImpl(int numthreads);
109
110 virtual ~ThreadPoolImpl();
111
112 ThreadPoolImpl *AddReference()
113 {
114 m_referenceCount++;
115
116 return this;
117 }
118
119 void markThreadAsleep(int id);
120
121 void waitForAllIdle();
122
123 int getThreadCount() const { return m_numThreads; }
124
125 bool IsValid() const { return m_ok; }
126
127 void release();
128
129 void Stop();
130
131 void enqueueJobProvider(JobProvider &);
132
133 void dequeueJobProvider(JobProvider &);
134
135 void FlushProviderList();
136
137 void pokeIdleThread();
138};
139
140void PoolThread::threadMain()
141{
142#if _WIN32
143 SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL);
144#else
145 __attribute__((unused)) int val = nice(10);
146#endif
147
148 while (m_pool.IsValid())
149 {
150 /* Walk list of job providers, looking for work */
151 JobProvider *cur = m_pool.m_firstProvider;
152 while (cur)
153 {
154 // FindJob() may perform actual work and return true. If
155 // it does we restart the job search
156 if (cur->findJob(m_id) == true)
157 break;
158
159 cur = cur->m_nextProvider;
160 }
161
162 // this thread has reached the end of the provider list
163 m_dirty = false;
164
165 if (cur == NULL)
166 {
167 m_pool.markThreadAsleep(m_id);
168 m_wakeEvent.wait();
169 }
170 }
171
172 m_exited = true;
173}
174
175void ThreadPoolImpl::markThreadAsleep(int id)
176{
177 int word = id >> 6;
178 uint64_t bit = 1LL << (id & 63);
179
180 ATOMIC_OR(&m_sleepMap[word], bit);
181}
182
183void ThreadPoolImpl::pokeIdleThread()
184{
185 /* Find a bit in the sleeping thread bitmap and poke it awake, do
186 * not give up until a thread is awakened or all of them are awake */
187 for (int i = 0; i < m_numSleepMapWords; i++)
188 {
189 uint64_t oldval = m_sleepMap[i];
190 while (oldval)
191 {
192 unsigned long id;
193 CTZ64(id, oldval);
194
195 uint64_t newval = oldval & ~(1LL << id);
196 if (ATOMIC_CAS(&m_sleepMap[i], oldval, newval) == oldval)
197 {
198 m_threads[(i << 6) | id].poke();
199 return;
200 }
201
202 oldval = m_sleepMap[i];
203 }
204 }
205}
206
207ThreadPoolImpl *ThreadPoolImpl::s_instance;
208Lock ThreadPoolImpl::s_createLock;
209
210/* static */
211ThreadPool *ThreadPool::allocThreadPool(int numthreads)
212{
213 if (ThreadPoolImpl::s_instance)
214 return ThreadPoolImpl::s_instance->AddReference();
215
216 /* acquire the lock to create the instance */
217 ThreadPoolImpl::s_createLock.acquire();
218
219 if (ThreadPoolImpl::s_instance)
220 /* pool was allocated while we waited for the lock */
221 ThreadPoolImpl::s_instance->AddReference();
222 else
223 ThreadPoolImpl::s_instance = new ThreadPoolImpl(numthreads);
224 ThreadPoolImpl::s_createLock.release();
225
226 return ThreadPoolImpl::s_instance;
227}
228
229ThreadPool *ThreadPool::getThreadPool()
230{
231 X265_CHECK(ThreadPoolImpl::s_instance, "getThreadPool() called prior to allocThreadPool()\n");
232 return ThreadPoolImpl::s_instance;
233}
234
235void ThreadPoolImpl::release()
236{
237 if (--m_referenceCount == 0)
238 {
239 X265_CHECK(this == ThreadPoolImpl::s_instance, "multiple thread pool instances detected\n");
240 ThreadPoolImpl::s_instance = NULL;
241 this->Stop();
242 delete this;
243 }
244}
245
246ThreadPoolImpl::ThreadPoolImpl(int numThreads)
247 : m_ok(false)
248 , m_referenceCount(1)
249 , m_firstProvider(NULL)
250 , m_lastProvider(NULL)
251{
252 m_numSleepMapWords = (numThreads + 63) >> 6;
253 m_sleepMap = X265_MALLOC(uint64_t, m_numSleepMapWords);
254
255 char *buffer = (char*)X265_MALLOC(PoolThread, numThreads);
256 m_threads = reinterpret_cast<PoolThread*>(buffer);
257 m_numThreads = numThreads;
258
259 if (m_threads && m_sleepMap)
260 {
261 for (int i = 0; i < m_numSleepMapWords; i++)
262 {
263 m_sleepMap[i] = 0;
264 }
265
266 m_ok = true;
267 int i;
268 for (i = 0; i < numThreads; i++)
269 {
270 new (buffer)PoolThread(*this, i);
271 buffer += sizeof(PoolThread);
272 if (!m_threads[i].start())
273 {
274 m_ok = false;
275 break;
276 }
277 }
278
279 if (m_ok)
280 {
281 waitForAllIdle();
282 }
283 else
284 {
285 // stop threads that did start up
286 for (int j = 0; j < i; j++)
287 {
288 m_threads[j].poke();
289 m_threads[j].stop();
290 }
291 }
292 }
293}
294
295void ThreadPoolImpl::waitForAllIdle()
296{
297 if (!m_ok)
298 return;
299
300 int id = 0;
301 do
302 {
303 int word = id >> 6;
304 uint64_t bit = 1LL << (id & 63);
305 if (m_sleepMap[word] & bit)
306 {
307 id++;
308 }
309 else
310 {
311 GIVE_UP_TIME();
312 }
313 }
314 while (id < m_numThreads);
315}
316
317void ThreadPoolImpl::Stop()
318{
319 if (m_ok)
320 {
321 waitForAllIdle();
322
323 // set invalid flag, then wake them up so they exit their main func
324 m_ok = false;
325 for (int i = 0; i < m_numThreads; i++)
326 {
327 m_threads[i].poke();
328 m_threads[i].stop();
329 }
330 }
331}
332
333ThreadPoolImpl::~ThreadPoolImpl()
334{
335 X265_FREE((void*)m_sleepMap);
336
337 if (m_threads)
338 {
339 // cleanup thread handles
340 for (int i = 0; i < m_numThreads; i++)
341 {
342 m_threads[i].~PoolThread();
343 }
344
345 X265_FREE(reinterpret_cast<char*>(m_threads));
346 }
347}
348
349void ThreadPoolImpl::enqueueJobProvider(JobProvider &p)
350{
351 // only one list writer at a time
352 ScopedLock l(m_writeLock);
353
354 p.m_nextProvider = NULL;
355 p.m_prevProvider = m_lastProvider;
356 m_lastProvider = &p;
357
358 if (p.m_prevProvider)
359 p.m_prevProvider->m_nextProvider = &p;
360 else
361 m_firstProvider = &p;
362}
363
364void ThreadPoolImpl::dequeueJobProvider(JobProvider &p)
365{
366 // only one list writer at a time
367 ScopedLock l(m_writeLock);
368
369 // update pool entry pointers first
370 if (m_firstProvider == &p)
371 m_firstProvider = p.m_nextProvider;
372
373 if (m_lastProvider == &p)
374 m_lastProvider = p.m_prevProvider;
375
376 // extract self from doubly linked lists
377 if (p.m_nextProvider)
378 p.m_nextProvider->m_prevProvider = p.m_prevProvider;
379
380 if (p.m_prevProvider)
381 p.m_prevProvider->m_nextProvider = p.m_nextProvider;
382
383 p.m_nextProvider = NULL;
384 p.m_prevProvider = NULL;
385}
386
387/* Ensure all threads have made a full pass through the provider list, ensuring
388 * dequeued providers are safe for deletion. */
389void ThreadPoolImpl::FlushProviderList()
390{
391 for (int i = 0; i < m_numThreads; i++)
392 {
393 m_threads[i].markDirty();
394 m_threads[i].poke();
395 }
396
397 int i;
398 do
399 {
400 for (i = 0; i < m_numThreads; i++)
401 {
402 if (m_threads[i].isDirty())
403 {
404 GIVE_UP_TIME();
405 break;
406 }
407 }
408 }
409 while (i < m_numThreads);
410}
411
412void JobProvider::flush()
413{
414 if (m_nextProvider || m_prevProvider)
415 dequeue();
416 dynamic_cast<ThreadPoolImpl*>(m_pool)->FlushProviderList();
417}
418
419void JobProvider::enqueue()
420{
421 // Add this provider to the end of the thread pool's job provider list
422 X265_CHECK(!m_nextProvider && !m_prevProvider && m_pool, "job provider was already queued\n");
423 m_pool->enqueueJobProvider(*this);
424 m_pool->pokeIdleThread();
425}
426
427void JobProvider::dequeue()
428{
429 // Remove this provider from the thread pool's job provider list
430 m_pool->dequeueJobProvider(*this);
431 // Ensure no jobs were missed while the provider was being removed
432 m_pool->pokeIdleThread();
433}
434
435int getCpuCount()
436{
437#if _WIN32
438 SYSTEM_INFO sysinfo;
439 GetSystemInfo(&sysinfo);
440 return sysinfo.dwNumberOfProcessors;
441#elif __unix__
442 return sysconf(_SC_NPROCESSORS_ONLN);
443#elif MACOS
444 int nm[2];
445 size_t len = 4;
446 uint32_t count;
447
448 nm[0] = CTL_HW;
449 nm[1] = HW_AVAILCPU;
450 sysctl(nm, 2, &count, &len, NULL, 0);
451
452 if (count < 1)
453 {
454 nm[1] = HW_NCPU;
455 sysctl(nm, 2, &count, &len, NULL, 0);
456 if (count < 1)
457 count = 1;
458 }
459
460 return count;
461#else // if _WIN32
462 return 2; // default to 2 threads, everywhere else
463#endif // if _WIN32
464}
465} // end namespace x265