1 /*****************************************************************************
2 * x265: singleton thread pool and interface classes
3 *****************************************************************************
4 * Copyright (C) 2013 x265 project
6 * Authors: Steve Borho <steve@borho.org>
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111, USA.
22 * This program is also available under a commercial proprietary license.
23 * For more information, contact us at license @ x265.com
24 *****************************************************************************/
27 #include "threadpool.h"
28 #include "threading.h"
33 #include <sys/param.h>
34 #include <sys/sysctl.h>
38 // x265 private namespace
42 class PoolThread
: public Thread
46 ThreadPoolImpl
&m_pool
;
48 PoolThread
& operator =(const PoolThread
&);
60 PoolThread(ThreadPoolImpl
& pool
, int id
)
68 bool isDirty() const { return m_dirty
; }
70 void markDirty() { m_dirty
= true; }
72 bool isExited() const { return m_exited
; }
74 void poke() { m_wakeEvent
.trigger(); }
76 virtual ~PoolThread() {}
81 class ThreadPoolImpl
: public ThreadPool
88 int m_numSleepMapWords
;
89 PoolThread
*m_threads
;
90 volatile uint64_t *m_sleepMap
;
92 /* Lock for write access to the provider lists. Threads are
93 * always allowed to read m_firstProvider and follow the
94 * linked list. Providers must zero their m_nextProvider
95 * pointers before removing themselves from this list */
100 static ThreadPoolImpl
*s_instance
;
101 static Lock s_createLock
;
103 JobProvider
*m_firstProvider
;
104 JobProvider
*m_lastProvider
;
108 ThreadPoolImpl(int numthreads
);
110 virtual ~ThreadPoolImpl();
112 ThreadPoolImpl
*AddReference()
119 void markThreadAsleep(int id
);
121 void waitForAllIdle();
123 int getThreadCount() const { return m_numThreads
; }
125 bool IsValid() const { return m_ok
; }
131 void enqueueJobProvider(JobProvider
&);
133 void dequeueJobProvider(JobProvider
&);
135 void FlushProviderList();
137 void pokeIdleThread();
140 void PoolThread::threadMain()
143 SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL
);
145 __attribute__((unused
)) int val
= nice(10);
148 while (m_pool
.IsValid())
150 /* Walk list of job providers, looking for work */
151 JobProvider
*cur
= m_pool
.m_firstProvider
;
154 // FindJob() may perform actual work and return true. If
155 // it does we restart the job search
156 if (cur
->findJob(m_id
) == true)
159 cur
= cur
->m_nextProvider
;
162 // this thread has reached the end of the provider list
167 m_pool
.markThreadAsleep(m_id
);
175 void ThreadPoolImpl::markThreadAsleep(int id
)
178 uint64_t bit
= 1LL << (id
& 63);
180 ATOMIC_OR(&m_sleepMap
[word
], bit
);
183 void ThreadPoolImpl::pokeIdleThread()
185 /* Find a bit in the sleeping thread bitmap and poke it awake, do
186 * not give up until a thread is awakened or all of them are awake */
187 for (int i
= 0; i
< m_numSleepMapWords
; i
++)
189 uint64_t oldval
= m_sleepMap
[i
];
195 uint64_t newval
= oldval
& ~(1LL << id
);
196 if (ATOMIC_CAS(&m_sleepMap
[i
], oldval
, newval
) == oldval
)
198 m_threads
[(i
<< 6) | id
].poke();
202 oldval
= m_sleepMap
[i
];
207 ThreadPoolImpl
*ThreadPoolImpl::s_instance
;
208 Lock
ThreadPoolImpl::s_createLock
;
211 ThreadPool
*ThreadPool::allocThreadPool(int numthreads
)
213 if (ThreadPoolImpl::s_instance
)
214 return ThreadPoolImpl::s_instance
->AddReference();
216 /* acquire the lock to create the instance */
217 ThreadPoolImpl::s_createLock
.acquire();
219 if (ThreadPoolImpl::s_instance
)
220 /* pool was allocated while we waited for the lock */
221 ThreadPoolImpl::s_instance
->AddReference();
223 ThreadPoolImpl::s_instance
= new ThreadPoolImpl(numthreads
);
224 ThreadPoolImpl::s_createLock
.release();
226 return ThreadPoolImpl::s_instance
;
229 ThreadPool
*ThreadPool::getThreadPool()
231 X265_CHECK(ThreadPoolImpl::s_instance
, "getThreadPool() called prior to allocThreadPool()\n");
232 return ThreadPoolImpl::s_instance
;
235 void ThreadPoolImpl::release()
237 if (--m_referenceCount
== 0)
239 X265_CHECK(this == ThreadPoolImpl::s_instance
, "multiple thread pool instances detected\n");
240 ThreadPoolImpl::s_instance
= NULL
;
246 ThreadPoolImpl::ThreadPoolImpl(int numThreads
)
248 , m_referenceCount(1)
249 , m_firstProvider(NULL
)
250 , m_lastProvider(NULL
)
252 m_numSleepMapWords
= (numThreads
+ 63) >> 6;
253 m_sleepMap
= X265_MALLOC(uint64_t, m_numSleepMapWords
);
255 char *buffer
= (char*)X265_MALLOC(PoolThread
, numThreads
);
256 m_threads
= reinterpret_cast<PoolThread
*>(buffer
);
257 m_numThreads
= numThreads
;
259 if (m_threads
&& m_sleepMap
)
261 for (int i
= 0; i
< m_numSleepMapWords
; i
++)
268 for (i
= 0; i
< numThreads
; i
++)
270 new (buffer
)PoolThread(*this, i
);
271 buffer
+= sizeof(PoolThread
);
272 if (!m_threads
[i
].start())
285 // stop threads that did start up
286 for (int j
= 0; j
< i
; j
++)
295 void ThreadPoolImpl::waitForAllIdle()
304 uint64_t bit
= 1LL << (id
& 63);
305 if (m_sleepMap
[word
] & bit
)
314 while (id
< m_numThreads
);
317 void ThreadPoolImpl::Stop()
323 // set invalid flag, then wake them up so they exit their main func
325 for (int i
= 0; i
< m_numThreads
; i
++)
333 ThreadPoolImpl::~ThreadPoolImpl()
335 X265_FREE((void*)m_sleepMap
);
339 // cleanup thread handles
340 for (int i
= 0; i
< m_numThreads
; i
++)
342 m_threads
[i
].~PoolThread();
345 X265_FREE(reinterpret_cast<char*>(m_threads
));
349 void ThreadPoolImpl::enqueueJobProvider(JobProvider
&p
)
351 // only one list writer at a time
352 ScopedLock
l(m_writeLock
);
354 p
.m_nextProvider
= NULL
;
355 p
.m_prevProvider
= m_lastProvider
;
358 if (p
.m_prevProvider
)
359 p
.m_prevProvider
->m_nextProvider
= &p
;
361 m_firstProvider
= &p
;
364 void ThreadPoolImpl::dequeueJobProvider(JobProvider
&p
)
366 // only one list writer at a time
367 ScopedLock
l(m_writeLock
);
369 // update pool entry pointers first
370 if (m_firstProvider
== &p
)
371 m_firstProvider
= p
.m_nextProvider
;
373 if (m_lastProvider
== &p
)
374 m_lastProvider
= p
.m_prevProvider
;
376 // extract self from doubly linked lists
377 if (p
.m_nextProvider
)
378 p
.m_nextProvider
->m_prevProvider
= p
.m_prevProvider
;
380 if (p
.m_prevProvider
)
381 p
.m_prevProvider
->m_nextProvider
= p
.m_nextProvider
;
383 p
.m_nextProvider
= NULL
;
384 p
.m_prevProvider
= NULL
;
387 /* Ensure all threads have made a full pass through the provider list, ensuring
388 * dequeued providers are safe for deletion. */
389 void ThreadPoolImpl::FlushProviderList()
391 for (int i
= 0; i
< m_numThreads
; i
++)
393 m_threads
[i
].markDirty();
400 for (i
= 0; i
< m_numThreads
; i
++)
402 if (m_threads
[i
].isDirty())
409 while (i
< m_numThreads
);
412 void JobProvider::flush()
414 if (m_nextProvider
|| m_prevProvider
)
416 dynamic_cast<ThreadPoolImpl
*>(m_pool
)->FlushProviderList();
419 void JobProvider::enqueue()
421 // Add this provider to the end of the thread pool's job provider list
422 X265_CHECK(!m_nextProvider
&& !m_prevProvider
&& m_pool
, "job provider was already queued\n");
423 m_pool
->enqueueJobProvider(*this);
424 m_pool
->pokeIdleThread();
427 void JobProvider::dequeue()
429 // Remove this provider from the thread pool's job provider list
430 m_pool
->dequeueJobProvider(*this);
431 // Ensure no jobs were missed while the provider was being removed
432 m_pool
->pokeIdleThread();
439 GetSystemInfo(&sysinfo
);
440 return sysinfo
.dwNumberOfProcessors
;
442 return sysconf(_SC_NPROCESSORS_ONLN
);
450 sysctl(nm
, 2, &count
, &len
, NULL
, 0);
455 sysctl(nm
, 2, &count
, &len
, NULL
, 0);
462 return 2; // default to 2 threads, everywhere else
465 } // end namespace x265