Files
@ 73bb99cf4331
Branch filter:
Location: libtransport.git/src/threadpool.cpp - annotation
73bb99cf4331
2.6 KiB
text/x-c++hdr
Merge branch 'libtwitcurl_fix'
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 | b3f59f9bb668 2b47d32a916c 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 5e19186950b9 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c | #include "transport/threadpool.h"
DEFINE_LOGGER(logger, "ThreadPool")
boost::signals2::signal< void (Thread*, int) > onWorkCompleted;
static void Worker(Thread *t, int wid, Swift::EventLoop *loop)
{
LOG4CXX_INFO(logger, "Starting thread " << wid)
t->run();
loop->postEvent(boost::bind(boost::ref(onWorkCompleted), t, wid), boost::shared_ptr<Swift::EventOwner>());
}
ThreadPool::ThreadPool(Swift::EventLoop *loop, int maxthreads) : MAX_THREADS(maxthreads)
{
this->loop = loop;
activeThreads = 0;
worker = new boost::thread*[MAX_THREADS];
for(int i=0 ; i<MAX_THREADS ; i++) {
worker[i] = NULL;
freeThreads.push(i);
}
onWorkCompleted.connect(boost::bind(&ThreadPool::cleandUp, this, _1, _2));
onWorkerAvailable.connect(boost::bind(&ThreadPool::scheduleFromQueue, this));
}
ThreadPool::~ThreadPool()
{
for(int i=0; i<MAX_THREADS ; i++) {
if(worker[i]) {
delete worker[i];
}
}
delete worker;
while(!requestQueue.empty()) {
Thread *t = requestQueue.front(); requestQueue.pop();
delete t;
}
}
int ThreadPool::getActiveThreadCount()
{
int res;
count_lock.lock();
res = activeThreads;
count_lock.unlock();
return res;
}
void ThreadPool::updateActiveThreadCount(int k)
{
count_lock.lock();
activeThreads += k;
count_lock.unlock();
}
int ThreadPool::getFreeThread()
{
int res = -1;
pool_lock.lock();
if(!freeThreads.empty()){
res = freeThreads.front();
freeThreads.pop();
updateActiveThreadCount(-1);
}
pool_lock.unlock();
return res;
}
void ThreadPool::releaseThread(int i)
{
if(i < 0 || i > MAX_THREADS) return;
pool_lock.lock();
delete worker[i];
worker[i] = NULL;
freeThreads.push(i);
updateActiveThreadCount(1);
pool_lock.unlock();
}
void ThreadPool::cleandUp(Thread *t, int wid)
{
LOG4CXX_INFO(logger, "Cleaning up thread #" << t->getThreadID())
t->finalize();
delete t;
releaseThread(wid);
onWorkerAvailable();
}
void ThreadPool::scheduleFromQueue()
{
criticalregion.lock();
while(!requestQueue.empty()) {
int w = getFreeThread();
if(w == -1) break;
LOG4CXX_INFO(logger, "Worker Available. Creating thread #" << w)
Thread *t = requestQueue.front(); requestQueue.pop();
t->setThreadID(w);
worker[w] = new boost::thread(Worker, t, w, loop);
updateActiveThreadCount(-1);
}
criticalregion.unlock();
}
void ThreadPool::runAsThread(Thread *t)
{
int w;
if((w = getFreeThread()) != -1) {
LOG4CXX_INFO(logger, "Creating thread #" << w)
t->setThreadID(w);
worker[w] = new boost::thread(Worker, t, w, loop);
updateActiveThreadCount(-1);
}
else {
LOG4CXX_INFO(logger, "No workers available! adding to queue.")
requestQueue.push(t);
}
}
|