1 /*****************************************************************************
2 * Copyright (C) 2013 x265 project
4 * Authors: Steve Borho <steve@borho.org>
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111, USA.
20 * This program is also available under a commercial proprietary license.
21 * For more information, contact us at license @ x265.com
22 *****************************************************************************/
25 #include "threadpool.h"
26 #include "threading.h"
31 #include <sys/param.h>
32 #include <sys/sysctl.h>
36 // x265 private namespace
40 class PoolThread
: public Thread
44 ThreadPoolImpl
&m_pool
;
46 PoolThread
& operator =(const PoolThread
&);
58 PoolThread(ThreadPoolImpl
& pool
, int id
)
66 bool isDirty() const { return m_dirty
; }
68 void markDirty() { m_dirty
= true; }
70 bool isExited() const { return m_exited
; }
72 void poke() { m_wakeEvent
.trigger(); }
74 virtual ~PoolThread() {}
79 class ThreadPoolImpl
: public ThreadPool
86 int m_numSleepMapWords
;
87 PoolThread
*m_threads
;
88 volatile uint32_t *m_sleepMap
;
90 /* Lock for write access to the provider lists. Threads are
91 * always allowed to read m_firstProvider and follow the
92 * linked list. Providers must zero their m_nextProvider
93 * pointers before removing themselves from this list */
98 static ThreadPoolImpl
*s_instance
;
99 static Lock s_createLock
;
101 JobProvider
*m_firstProvider
;
102 JobProvider
*m_lastProvider
;
106 ThreadPoolImpl(int numthreads
);
108 virtual ~ThreadPoolImpl();
110 ThreadPoolImpl
*AddReference()
117 void markThreadAsleep(int id
);
119 void waitForAllIdle();
121 int getThreadCount() const { return m_numThreads
; }
123 bool IsValid() const { return m_ok
; }
129 void enqueueJobProvider(JobProvider
&);
131 void dequeueJobProvider(JobProvider
&);
133 void FlushProviderList();
135 void pokeIdleThread();
138 void PoolThread::threadMain()
141 SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL
);
143 __attribute__((unused
)) int val
= nice(10);
146 while (m_pool
.IsValid())
148 /* Walk list of job providers, looking for work */
149 JobProvider
*cur
= m_pool
.m_firstProvider
;
152 // FindJob() may perform actual work and return true. If
153 // it does we restart the job search
154 if (cur
->findJob(m_id
) == true)
157 cur
= cur
->m_nextProvider
;
160 // this thread has reached the end of the provider list
165 m_pool
.markThreadAsleep(m_id
);
173 void ThreadPoolImpl::markThreadAsleep(int id
)
176 uint32_t bit
= 1 << (id
& 31);
178 ATOMIC_OR(&m_sleepMap
[word
], bit
);
181 void ThreadPoolImpl::pokeIdleThread()
183 /* Find a bit in the sleeping thread bitmap and poke it awake, do
184 * not give up until a thread is awakened or all of them are awake */
185 for (int i
= 0; i
< m_numSleepMapWords
; i
++)
187 uint32_t oldval
= m_sleepMap
[i
];
193 uint32_t bit
= 1 << id
;
194 if (ATOMIC_AND(&m_sleepMap
[i
], ~bit
) & bit
)
196 m_threads
[i
* 32 + id
].poke();
200 oldval
= m_sleepMap
[i
];
205 ThreadPoolImpl
*ThreadPoolImpl::s_instance
;
206 Lock
ThreadPoolImpl::s_createLock
;
209 ThreadPool
*ThreadPool::allocThreadPool(int numthreads
)
211 if (ThreadPoolImpl::s_instance
)
212 return ThreadPoolImpl::s_instance
->AddReference();
214 /* acquire the lock to create the instance */
215 ThreadPoolImpl::s_createLock
.acquire();
217 if (ThreadPoolImpl::s_instance
)
218 /* pool was allocated while we waited for the lock */
219 ThreadPoolImpl::s_instance
->AddReference();
221 ThreadPoolImpl::s_instance
= new ThreadPoolImpl(numthreads
);
222 ThreadPoolImpl::s_createLock
.release();
224 return ThreadPoolImpl::s_instance
;
227 ThreadPool
*ThreadPool::getThreadPool()
229 X265_CHECK(ThreadPoolImpl::s_instance
, "getThreadPool() called prior to allocThreadPool()\n");
230 return ThreadPoolImpl::s_instance
;
233 void ThreadPoolImpl::release()
235 if (--m_referenceCount
== 0)
237 X265_CHECK(this == ThreadPoolImpl::s_instance
, "multiple thread pool instances detected\n");
238 ThreadPoolImpl::s_instance
= NULL
;
244 ThreadPoolImpl::ThreadPoolImpl(int numThreads
)
246 , m_referenceCount(1)
247 , m_firstProvider(NULL
)
248 , m_lastProvider(NULL
)
250 m_numSleepMapWords
= (numThreads
+ 31) >> 5;
251 m_sleepMap
= X265_MALLOC(uint32_t, m_numSleepMapWords
);
253 char *buffer
= (char*)X265_MALLOC(PoolThread
, numThreads
);
254 m_threads
= reinterpret_cast<PoolThread
*>(buffer
);
255 m_numThreads
= numThreads
;
257 if (m_threads
&& m_sleepMap
)
259 for (int i
= 0; i
< m_numSleepMapWords
; i
++)
264 for (i
= 0; i
< numThreads
; i
++)
266 new (buffer
)PoolThread(*this, i
);
267 buffer
+= sizeof(PoolThread
);
268 if (!m_threads
[i
].start())
279 // stop threads that did start up
280 for (int j
= 0; j
< i
; j
++)
289 void ThreadPoolImpl::waitForAllIdle()
298 uint32_t bit
= 1 << (id
& 31);
299 if (m_sleepMap
[word
] & bit
)
306 while (id
< m_numThreads
);
309 void ThreadPoolImpl::Stop()
315 // set invalid flag, then wake them up so they exit their main func
317 for (int i
= 0; i
< m_numThreads
; i
++)
325 ThreadPoolImpl::~ThreadPoolImpl()
327 X265_FREE((void*)m_sleepMap
);
331 // cleanup thread handles
332 for (int i
= 0; i
< m_numThreads
; i
++)
333 m_threads
[i
].~PoolThread();
335 X265_FREE(reinterpret_cast<char*>(m_threads
));
339 void ThreadPoolImpl::enqueueJobProvider(JobProvider
&p
)
341 // only one list writer at a time
342 ScopedLock
l(m_writeLock
);
344 p
.m_nextProvider
= NULL
;
345 p
.m_prevProvider
= m_lastProvider
;
348 if (p
.m_prevProvider
)
349 p
.m_prevProvider
->m_nextProvider
= &p
;
351 m_firstProvider
= &p
;
354 void ThreadPoolImpl::dequeueJobProvider(JobProvider
&p
)
356 // only one list writer at a time
357 ScopedLock
l(m_writeLock
);
359 // update pool entry pointers first
360 if (m_firstProvider
== &p
)
361 m_firstProvider
= p
.m_nextProvider
;
363 if (m_lastProvider
== &p
)
364 m_lastProvider
= p
.m_prevProvider
;
366 // extract self from doubly linked lists
367 if (p
.m_nextProvider
)
368 p
.m_nextProvider
->m_prevProvider
= p
.m_prevProvider
;
370 if (p
.m_prevProvider
)
371 p
.m_prevProvider
->m_nextProvider
= p
.m_nextProvider
;
373 p
.m_nextProvider
= NULL
;
374 p
.m_prevProvider
= NULL
;
377 /* Ensure all threads have made a full pass through the provider list, ensuring
378 * dequeued providers are safe for deletion. */
379 void ThreadPoolImpl::FlushProviderList()
381 for (int i
= 0; i
< m_numThreads
; i
++)
383 m_threads
[i
].markDirty();
390 for (i
= 0; i
< m_numThreads
; i
++)
392 if (m_threads
[i
].isDirty())
399 while (i
< m_numThreads
);
402 void JobProvider::flush()
404 if (m_nextProvider
|| m_prevProvider
)
406 dynamic_cast<ThreadPoolImpl
*>(m_pool
)->FlushProviderList();
409 void JobProvider::enqueue()
411 // Add this provider to the end of the thread pool's job provider list
412 X265_CHECK(!m_nextProvider
&& !m_prevProvider
&& m_pool
, "job provider was already queued\n");
413 m_pool
->enqueueJobProvider(*this);
414 m_pool
->pokeIdleThread();
417 void JobProvider::dequeue()
419 // Remove this provider from the thread pool's job provider list
420 m_pool
->dequeueJobProvider(*this);
421 // Ensure no jobs were missed while the provider was being removed
422 m_pool
->pokeIdleThread();
429 GetSystemInfo(&sysinfo
);
430 return sysinfo
.dwNumberOfProcessors
;
432 return sysconf(_SC_NPROCESSORS_ONLN
);
440 sysctl(nm
, 2, &count
, &len
, NULL
, 0);
445 sysctl(nm
, 2, &count
, &len
, NULL
, 0);
452 return 2; // default to 2 threads, everywhere else
455 } // end namespace x265