Files
@ 5b5c3b6c525c
Branch filter:
Location: libtransport.git/src/ThreadPool.cpp - annotation
5b5c3b6c525c
2.6 KiB
text/x-c++hdr
drop libyahoo backend from debian build scripts
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | 78e71f9345c7 78e71f9345c7 78e71f9345c7 2b47d32a916c 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 5e19186950b9 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c | #include "transport/ThreadPool.h"
#include "transport/Logging.h"
DEFINE_LOGGER(logger, "ThreadPool")
boost::signals2::signal< void (Thread*, int) > onWorkCompleted;
static void Worker(Thread *t, int wid, Swift::EventLoop *loop)
{
LOG4CXX_INFO(logger, "Starting thread " << wid)
t->run();
loop->postEvent(boost::bind(boost::ref(onWorkCompleted), t, wid), boost::shared_ptr<Swift::EventOwner>());
}
ThreadPool::ThreadPool(Swift::EventLoop *loop, int maxthreads) : MAX_THREADS(maxthreads)
{
this->loop = loop;
activeThreads = 0;
worker = new boost::thread*[MAX_THREADS];
for(int i=0 ; i<MAX_THREADS ; i++) {
worker[i] = NULL;
freeThreads.push(i);
}
onWorkCompleted.connect(boost::bind(&ThreadPool::cleandUp, this, _1, _2));
onWorkerAvailable.connect(boost::bind(&ThreadPool::scheduleFromQueue, this));
}
ThreadPool::~ThreadPool()
{
for(int i=0; i<MAX_THREADS ; i++) {
if(worker[i]) {
delete worker[i];
}
}
delete worker;
while(!requestQueue.empty()) {
Thread *t = requestQueue.front(); requestQueue.pop();
delete t;
}
}
int ThreadPool::getActiveThreadCount()
{
int res;
count_lock.lock();
res = activeThreads;
count_lock.unlock();
return res;
}
void ThreadPool::updateActiveThreadCount(int k)
{
count_lock.lock();
activeThreads += k;
count_lock.unlock();
}
int ThreadPool::getFreeThread()
{
int res = -1;
pool_lock.lock();
if(!freeThreads.empty()){
res = freeThreads.front();
freeThreads.pop();
updateActiveThreadCount(-1);
}
pool_lock.unlock();
return res;
}
void ThreadPool::releaseThread(int i)
{
if(i < 0 || i > MAX_THREADS) return;
pool_lock.lock();
delete worker[i];
worker[i] = NULL;
freeThreads.push(i);
updateActiveThreadCount(1);
pool_lock.unlock();
}
void ThreadPool::cleandUp(Thread *t, int wid)
{
LOG4CXX_INFO(logger, "Cleaning up thread #" << t->getThreadID())
t->finalize();
delete t;
releaseThread(wid);
onWorkerAvailable();
}
void ThreadPool::scheduleFromQueue()
{
criticalregion.lock();
while(!requestQueue.empty()) {
int w = getFreeThread();
if(w == -1) break;
LOG4CXX_INFO(logger, "Worker Available. Creating thread #" << w)
Thread *t = requestQueue.front(); requestQueue.pop();
t->setThreadID(w);
worker[w] = new boost::thread(Worker, t, w, loop);
updateActiveThreadCount(-1);
}
criticalregion.unlock();
}
void ThreadPool::runAsThread(Thread *t)
{
int w;
if((w = getFreeThread()) != -1) {
LOG4CXX_INFO(logger, "Creating thread #" << w)
t->setThreadID(w);
worker[w] = new boost::thread(Worker, t, w, loop);
updateActiveThreadCount(-1);
}
else {
LOG4CXX_INFO(logger, "No workers available! adding to queue.")
requestQueue.push(t);
}
}
|