Files @ 6e1af765cdc0
Branch filter:

Location: libtransport.git/libtransport/ThreadPool.cpp - annotation

Jan Kaluza
AdminInterface: Support labels for commands
78e71f9345c7
78e71f9345c7
78e71f9345c7
5adb3d1f9733
5adb3d1f9733
2b47d32a916c
2b47d32a916c
2b47d32a916c
5e19186950b9
2b47d32a916c
2b47d32a916c
2b47d32a916c
5e19186950b9
2b47d32a916c
2b47d32a916c
2b47d32a916c
5e19186950b9
2b47d32a916c
5e19186950b9
2b47d32a916c
7500ab6c4c30
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
7500ab6c4c30
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
5e19186950b9
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
5e19186950b9
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
2b47d32a916c
5adb3d1f9733
5adb3d1f9733
#include "transport/ThreadPool.h"
#include "transport/Logging.h"

namespace Transport {

DEFINE_LOGGER(logger, "ThreadPool")
boost::signals2::signal< void (Thread*, int) > onWorkCompleted;

static void Worker(Thread *t, int wid, Swift::EventLoop *loop)
{
	LOG4CXX_INFO(logger, "Starting thread " << wid)
	t->run();
	loop->postEvent(boost::bind(boost::ref(onWorkCompleted), t, wid), boost::shared_ptr<Swift::EventOwner>());
}


ThreadPool::ThreadPool(Swift::EventLoop *loop, int maxthreads) : MAX_THREADS(maxthreads)
{
	this->loop = loop;
	activeThreads = 0;
	worker = (boost::thread **) malloc(sizeof(boost::thread *) * MAX_THREADS);
	for(int i=0 ; i<MAX_THREADS ; i++) {
		worker[i] = NULL;
		freeThreads.push(i);
	}
	onWorkCompleted.connect(boost::bind(&ThreadPool::cleandUp, this, _1, _2));
	onWorkerAvailable.connect(boost::bind(&ThreadPool::scheduleFromQueue, this));
}

ThreadPool::~ThreadPool()
{
	for(int i=0; i<MAX_THREADS ; i++) {
		if(worker[i]) {
			delete worker[i];
		}
	}
	free(worker);

	while(!requestQueue.empty()) {
		Thread *t = requestQueue.front(); requestQueue.pop();
		delete t;
	}
}

int ThreadPool::getActiveThreadCount()
{	
	int res;
	count_lock.lock(); 
	res = activeThreads;
	count_lock.unlock();
	return res;
}

void ThreadPool::updateActiveThreadCount(int k)
{
	count_lock.lock();
	activeThreads += k;
	count_lock.unlock();
}

int ThreadPool::getFreeThread()
{
	int res = -1;
	pool_lock.lock();
	if(!freeThreads.empty()){
		res = freeThreads.front();
		freeThreads.pop();
		updateActiveThreadCount(-1);
	}
	pool_lock.unlock();
	return res;
}

void ThreadPool::releaseThread(int i)
{
	if(i < 0 || i > MAX_THREADS) return;
	pool_lock.lock();

	delete worker[i];
	worker[i] = NULL;
	freeThreads.push(i);
	
	updateActiveThreadCount(1);
	
	pool_lock.unlock();
}

void ThreadPool::cleandUp(Thread *t, int wid)
{
	LOG4CXX_INFO(logger, "Cleaning up thread #" << t->getThreadID())
	t->finalize();
	delete t;
	releaseThread(wid);
	onWorkerAvailable();	
}

void ThreadPool::scheduleFromQueue()
{
	criticalregion.lock();	
	while(!requestQueue.empty()) {
		int  w = getFreeThread();
		if(w == -1) break;

		LOG4CXX_INFO(logger, "Worker Available. Creating thread #" << w)
		Thread *t = requestQueue.front(); requestQueue.pop();
		t->setThreadID(w);
		worker[w] = new boost::thread(Worker, t, w, loop);
		updateActiveThreadCount(-1);
	}
	criticalregion.unlock();
}


void ThreadPool::runAsThread(Thread *t)
{
	int w;
	if((w = getFreeThread()) != -1) {
		LOG4CXX_INFO(logger, "Creating thread #" << w)
		t->setThreadID(w);
		worker[w] = new boost::thread(Worker, t, w, loop);
		updateActiveThreadCount(-1);
	}
	else {
		LOG4CXX_INFO(logger, "No workers available! adding to queue.")
		requestQueue.push(t);
	}
}

}