Files
@ 67f274fe41e2
Branch filter:
Location: libtransport.git/libtransport/ThreadPool.cpp - annotation
67f274fe41e2
2.6 KiB
text/x-c++hdr
spectrum2_manager server: Allow registering and unregistering Spectrum 2 transport users
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | 78e71f9345c7 78e71f9345c7 78e71f9345c7 5adb3d1f9733 5adb3d1f9733 2b47d32a916c 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 5e19186950b9 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 5adb3d1f9733 5adb3d1f9733 | #include "transport/ThreadPool.h"
#include "transport/Logging.h"
namespace Transport {
DEFINE_LOGGER(logger, "ThreadPool")
boost::signals2::signal< void (Thread*, int) > onWorkCompleted;
static void Worker(Thread *t, int wid, Swift::EventLoop *loop)
{
LOG4CXX_INFO(logger, "Starting thread " << wid)
t->run();
loop->postEvent(boost::bind(boost::ref(onWorkCompleted), t, wid), boost::shared_ptr<Swift::EventOwner>());
}
ThreadPool::ThreadPool(Swift::EventLoop *loop, int maxthreads) : MAX_THREADS(maxthreads)
{
this->loop = loop;
activeThreads = 0;
worker = new boost::thread*[MAX_THREADS];
for(int i=0 ; i<MAX_THREADS ; i++) {
worker[i] = NULL;
freeThreads.push(i);
}
onWorkCompleted.connect(boost::bind(&ThreadPool::cleandUp, this, _1, _2));
onWorkerAvailable.connect(boost::bind(&ThreadPool::scheduleFromQueue, this));
}
ThreadPool::~ThreadPool()
{
for(int i=0; i<MAX_THREADS ; i++) {
if(worker[i]) {
delete worker[i];
}
}
delete worker;
while(!requestQueue.empty()) {
Thread *t = requestQueue.front(); requestQueue.pop();
delete t;
}
}
int ThreadPool::getActiveThreadCount()
{
int res;
count_lock.lock();
res = activeThreads;
count_lock.unlock();
return res;
}
void ThreadPool::updateActiveThreadCount(int k)
{
count_lock.lock();
activeThreads += k;
count_lock.unlock();
}
int ThreadPool::getFreeThread()
{
int res = -1;
pool_lock.lock();
if(!freeThreads.empty()){
res = freeThreads.front();
freeThreads.pop();
updateActiveThreadCount(-1);
}
pool_lock.unlock();
return res;
}
void ThreadPool::releaseThread(int i)
{
if(i < 0 || i > MAX_THREADS) return;
pool_lock.lock();
delete worker[i];
worker[i] = NULL;
freeThreads.push(i);
updateActiveThreadCount(1);
pool_lock.unlock();
}
void ThreadPool::cleandUp(Thread *t, int wid)
{
LOG4CXX_INFO(logger, "Cleaning up thread #" << t->getThreadID())
t->finalize();
delete t;
releaseThread(wid);
onWorkerAvailable();
}
void ThreadPool::scheduleFromQueue()
{
criticalregion.lock();
while(!requestQueue.empty()) {
int w = getFreeThread();
if(w == -1) break;
LOG4CXX_INFO(logger, "Worker Available. Creating thread #" << w)
Thread *t = requestQueue.front(); requestQueue.pop();
t->setThreadID(w);
worker[w] = new boost::thread(Worker, t, w, loop);
updateActiveThreadCount(-1);
}
criticalregion.unlock();
}
void ThreadPool::runAsThread(Thread *t)
{
int w;
if((w = getFreeThread()) != -1) {
LOG4CXX_INFO(logger, "Creating thread #" << w)
t->setThreadID(w);
worker[w] = new boost::thread(Worker, t, w, loop);
updateActiveThreadCount(-1);
}
else {
LOG4CXX_INFO(logger, "No workers available! adding to queue.")
requestQueue.push(t);
}
}
}
|