Files
        @ 6f9fbae0907a
    
        
              Branch filter: 
        
    Location: libtransport.git/src/threadpool.cpp - annotation
        
            
            6f9fbae0907a
            2.6 KiB
            text/x-c++hdr
        
        
    
    Twitter: remove user from database even when he logged out before sending #pin
    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123  | b3f59f9bb668 2b47d32a916c 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 5e19186950b9 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 5e19186950b9 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c 2b47d32a916c  | #include "transport/threadpool.h"
DEFINE_LOGGER(logger, "ThreadPool")
boost::signals2::signal< void (Thread*, int) > onWorkCompleted;
static void Worker(Thread *t, int wid, Swift::EventLoop *loop)
{
	LOG4CXX_INFO(logger, "Starting thread " << wid)
	t->run();
	loop->postEvent(boost::bind(boost::ref(onWorkCompleted), t, wid), boost::shared_ptr<Swift::EventOwner>());
}
ThreadPool::ThreadPool(Swift::EventLoop *loop, int maxthreads) : MAX_THREADS(maxthreads)
{
	this->loop = loop;
	activeThreads = 0;
	worker = new boost::thread*[MAX_THREADS];
	for(int i=0 ; i<MAX_THREADS ; i++) {
		worker[i] = NULL;
		freeThreads.push(i);
	}
	onWorkCompleted.connect(boost::bind(&ThreadPool::cleandUp, this, _1, _2));
	onWorkerAvailable.connect(boost::bind(&ThreadPool::scheduleFromQueue, this));
}
ThreadPool::~ThreadPool()
{
	for(int i=0; i<MAX_THREADS ; i++) {
		if(worker[i]) {
			delete worker[i];
		}
	}
	delete worker;
	while(!requestQueue.empty()) {
		Thread *t = requestQueue.front(); requestQueue.pop();
		delete t;
	}
}
int ThreadPool::getActiveThreadCount()
{	
	int res;
	count_lock.lock(); 
	res = activeThreads;
	count_lock.unlock();
	return res;
}
void ThreadPool::updateActiveThreadCount(int k)
{
	count_lock.lock();
	activeThreads += k;
	count_lock.unlock();
}
int ThreadPool::getFreeThread()
{
	int res = -1;
	pool_lock.lock();
	if(!freeThreads.empty()){
		res = freeThreads.front();
		freeThreads.pop();
		updateActiveThreadCount(-1);
	}
	pool_lock.unlock();
	return res;
}
void ThreadPool::releaseThread(int i)
{
	if(i < 0 || i > MAX_THREADS) return;
	pool_lock.lock();
	delete worker[i];
	worker[i] = NULL;
	freeThreads.push(i);
	
	updateActiveThreadCount(1);
	
	pool_lock.unlock();
}
void ThreadPool::cleandUp(Thread *t, int wid)
{
	LOG4CXX_INFO(logger, "Cleaning up thread #" << t->getThreadID())
	t->finalize();
	delete t;
	releaseThread(wid);
	onWorkerAvailable();	
}
void ThreadPool::scheduleFromQueue()
{
	criticalregion.lock();	
	while(!requestQueue.empty()) {
		int  w = getFreeThread();
		if(w == -1) break;
		LOG4CXX_INFO(logger, "Worker Available. Creating thread #" << w)
		Thread *t = requestQueue.front(); requestQueue.pop();
		t->setThreadID(w);
		worker[w] = new boost::thread(Worker, t, w, loop);
		updateActiveThreadCount(-1);
	}
	criticalregion.unlock();
}
void ThreadPool::runAsThread(Thread *t)
{
	int w;
	if((w = getFreeThread()) != -1) {
		LOG4CXX_INFO(logger, "Creating thread #" << w)
		t->setThreadID(w);
		worker[w] = new boost::thread(Worker, t, w, loop);
		updateActiveThreadCount(-1);
	}
	else {
		LOG4CXX_INFO(logger, "No workers available! adding to queue.")
		requestQueue.push(t);
	}
}
 |